Repository: hadoop Updated Branches: refs/heads/branch-3.2 f5e1bad0f -> bee5bf867
YARN-9001. [Submarine] Use AppAdminClient instead of ServiceClient to sumbit jobs. (Zac Zhou via wangda) Change-Id: I7e8d1c27ebd37e0907ca570c4f3d56fe7a859635 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bee5bf86 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bee5bf86 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bee5bf86 Branch: refs/heads/branch-3.2 Commit: bee5bf867ba6708f67bb368e2e32c5df538a2e70 Parents: f5e1bad Author: Wangda Tan <wan...@apache.org> Authored: Fri Nov 16 10:26:50 2018 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Fri Nov 16 10:26:50 2018 -0800 ---------------------------------------------------------------------- .../yarn/service/client/ServiceClient.java | 1 + .../submarine/runtimes/common/JobMonitor.java | 6 +++ .../yarnservice/YarnServiceJobMonitor.java | 25 +++++++++--- .../yarnservice/YarnServiceJobSubmitter.java | 42 ++++++++++++++++++-- .../runtimes/yarnservice/YarnServiceUtils.java | 15 +++---- .../yarnservice/TestYarnServiceRunJobCli.java | 12 +++--- .../submarine/common/MockClientContext.java | 1 - 7 files changed, 79 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 8724a87..e70e96b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -1360,6 +1360,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, LOG.info("Service {} does not have an application ID", serviceName); return appSpec; } + appSpec.setId(currentAppId.toString()); ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); appSpec.setState(convertState(appReport.getYarnApplicationState())); ApplicationTimeout lifetime = http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java index c81393b..35e21fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java @@ -48,6 +48,11 @@ public abstract class JobMonitor { throws IOException, YarnException; /** + * Cleanup AppAdminClient, etc. + */ + public void cleanup() throws IOException {} + + /** * Continue wait and print status if job goes to ready or final state. * @param jobName * @throws IOException @@ -80,5 +85,6 @@ public abstract class JobMonitor { throw new IOException(e); } } + cleanup(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java index 94d30b0..c95aa14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java @@ -14,9 +14,10 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.submarine.common.api.JobStatus; import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder; @@ -25,22 +26,34 @@ import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; import java.io.IOException; public class YarnServiceJobMonitor extends JobMonitor { - private ServiceClient serviceClient = null; + private volatile AppAdminClient serviceClient = null; public YarnServiceJobMonitor(ClientContext clientContext) { super(clientContext); } @Override - public synchronized JobStatus getTrainingJobStatus(String jobName) + public JobStatus getTrainingJobStatus(String jobName) throws IOException, YarnException { if (this.serviceClient == null) { - this.serviceClient = YarnServiceUtils.createServiceClient( - clientContext.getYarnConfig()); + synchronized(this) { + if (this.serviceClient == null) { + this.serviceClient = YarnServiceUtils.createServiceClient( + clientContext.getYarnConfig()); + } + } } - Service serviceSpec = this.serviceClient.getStatus(jobName); + String appStatus=serviceClient.getStatusString(jobName); + Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus); JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec); return jobStatus; } + + @Override + public void cleanup() throws IOException{ + if (this.serviceClient != null) { + this.serviceClient.close(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index d57c675..d9a88a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.records.Artifact; @@ -27,7 +28,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.ClientContext; @@ -53,6 +54,8 @@ import java.util.Set; import java.util.StringTokenizer; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; /** * Submit a job to cluster @@ -530,6 +533,20 @@ public class YarnServiceJobSubmitter implements JobSubmitter { return serviceSpec; } + private String generateServiceSpecFile(Service service) throws IOException { + File serviceSpecFile = File.createTempFile(service.getName(), ".json"); + String buffer = jsonSerDeser.toJson(service); + Writer w = new OutputStreamWriter(new FileOutputStream(serviceSpecFile), + "UTF-8"); + PrintWriter pw = new PrintWriter(w); + try { + pw.append(buffer); + } finally { + pw.close(); + } + return serviceSpecFile.getAbsolutePath(); + } + /** * {@inheritDoc} */ @@ -537,13 +554,30 @@ public class YarnServiceJobSubmitter implements JobSubmitter { public ApplicationId submitJob(RunJobParameters parameters) throws IOException, YarnException { createServiceByParameters(parameters); - ServiceClient serviceClient = YarnServiceUtils.createServiceClient( + String serviceSpecFile = generateServiceSpecFile(serviceSpec); + + AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient( clientContext.getYarnConfig()); - ApplicationId appid = serviceClient.actionCreate(serviceSpec); - serviceClient.stop(); + int code = appAdminClient.actionLaunch(serviceSpecFile, + serviceSpec.getName(), null, null); + if(code != EXIT_SUCCESS) { + throw new YarnException("Fail to launch application with exit code:" + + code); + } + + String appStatus=appAdminClient.getStatusString(serviceSpec.getName()); + Service app=ServiceApiUtil.jsonSerDeser.fromJson(appStatus); + if(app.getId() == null) { + throw new YarnException("Can't get application id for Service " + + serviceSpec.getName()); + } + ApplicationId appid = ApplicationId.fromString(app.getId()); + appAdminClient.stop(); return appid; } + + @VisibleForTesting public Service getServiceSpec() { return serviceSpec; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java index d69840a..2591a00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java @@ -16,8 +16,8 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.submarine.common.Envs; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; import org.slf4j.Logger; @@ -26,27 +26,28 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE; + public class YarnServiceUtils { private static final Logger LOG = LoggerFactory.getLogger(YarnServiceUtils.class); // This will be true only in UT. - private static ServiceClient stubServiceClient = null; + private static AppAdminClient stubServiceClient = null; - public static ServiceClient createServiceClient( + public static AppAdminClient createServiceClient( Configuration yarnConfiguration) { if (stubServiceClient != null) { return stubServiceClient; } - ServiceClient serviceClient = new ServiceClient(); - serviceClient.init(yarnConfiguration); - serviceClient.start(); + AppAdminClient serviceClient = AppAdminClient.createAppAdminClient( + DEFAULT_TYPE, yarnConfiguration); return serviceClient; } @VisibleForTesting - public static void setStubServiceClient(ServiceClient stubServiceClient) { + public static void setStubServiceClient(AppAdminClient stubServiceClient) { YarnServiceUtils.stubServiceClient = stubServiceClient; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java index 89d39a0..4391030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java @@ -19,12 +19,11 @@ package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli; import org.apache.hadoop.yarn.submarine.common.MockClientContext; import org.apache.hadoop.yarn.submarine.common.api.TaskType; @@ -45,6 +44,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Map; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,9 +53,11 @@ public class TestYarnServiceRunJobCli { @Before public void before() throws IOException, YarnException { SubmarineLogs.verboseOff(); - ServiceClient serviceClient = mock(ServiceClient.class); - when(serviceClient.actionCreate(any(Service.class))).thenReturn( - ApplicationId.newInstance(1234L, 1)); + AppAdminClient serviceClient = mock(AppAdminClient.class); + when(serviceClient.actionLaunch(any(String.class), any(String.class), + any(Long.class), any(String.class))).thenReturn(EXIT_SUCCESS); + when(serviceClient.getStatusString(any(String.class))).thenReturn( + "{\"id\": \"application_1234_1\"}"); YarnServiceUtils.setStubServiceClient(serviceClient); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee5bf86/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java index 5c06ddc..b59c01e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import java.io.IOException; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org