Repository: reef Updated Branches: refs/heads/master b0626ee47 -> b8d2bad86
[REEF-1686] Create a variant of YarnClientConfiguration for Unmanaged AM Summary of changes: * Create Driver and Client configuration modules for the Unmanaged AM app, and add extra parameters required for that mode; * Implement Unmanaged AM app submission functionality; * Make `YarnDriverConfigurationProviderImpl` class public so that Unmanaged AM configuration can refer to it. JIRA: [REEF-1686](https://issues.apache.org/jira/browse/REEF-1686) This closes #1243 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b8d2bad8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b8d2bad8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b8d2bad8 Branch: refs/heads/master Commit: b8d2bad8643bf3bbbee27bbd20d8799fc2046967 Parents: b0626ee Author: Sergiy Matusevych <[email protected]> Authored: Wed Jan 25 16:25:35 2017 -0800 Committer: Julia Wang <[email protected]> Committed: Mon Jan 30 19:36:47 2017 -0800 ---------------------------------------------------------------------- .../YarnDriverConfigurationProviderImpl.java | 5 +- .../yarn/client/parameters/RootFolder.java | 35 +++++ .../UnmanagedAmYarnClientConfiguration.java | 72 ++++++++++ .../UnmanagedAmYarnDriverConfiguration.java | 93 +++++++++++++ .../UnmanagedAmYarnJobSubmissionHandler.java | 117 ++++++++++++++++ .../UnmanagedAmYarnSubmissionHelper.java | 139 +++++++++++++++++++ .../client/unmanaged/UnmanagedDriverFiles.java | 78 +++++++++++ .../yarn/client/unmanaged/package-info.java | 22 +++ 8 files changed, 560 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java index 5509cd1..37f6634 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.reef.runtime.yarn.client; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.runtime.common.client.DriverConfigurationProvider; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; @@ -33,7 +34,9 @@ import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*; /** * Default driver configuration provider for yarn runtime. */ -final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider { +@Private +public final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider { + private final double jvmSlack; @Inject http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java new file mode 100644 index 0000000..5f27fc9 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Directory in the local filesystem to hold resources required to run the driver and submit evaluators. + */ +@NamedParameter(doc = "Folder to hold resources of the Unmanaged Driver", + default_value = RootFolder.DEFAULT_VALUE, short_name = "root_folder") +public final class RootFolder implements Name<String> { + + public static final String DEFAULT_VALUE = "REEF_LOCAL_RUNTIME"; + + /** Empty private constructor to prohibit instantiation of the utility class. */ + private RootFolder() { } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java new file mode 100644 index 0000000..7862ad9 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.driver.parameters.DriverIsUnmanaged; +import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; +import org.apache.reef.runtime.common.client.DriverConfigurationProvider; +import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.yarn.YarnClasspathProvider; +import org.apache.reef.runtime.yarn.client.YarnDriverConfigurationProviderImpl; +import org.apache.reef.runtime.yarn.client.parameters.JobPriority; +import org.apache.reef.runtime.yarn.client.parameters.JobQueue; +import org.apache.reef.runtime.yarn.client.parameters.RootFolder; +import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.ConfigurationProvider; +import org.apache.reef.tang.formats.*; +import org.apache.reef.util.logging.LoggingSetup; + +/** + * A ConfigurationModule for the YARN resource manager. + */ +@Public +@ClientSide +public class UnmanagedAmYarnClientConfiguration extends ConfigurationModuleBuilder { + + static { + LoggingSetup.setupCommonsLogging(); + } + + public static final OptionalParameter<String> YARN_QUEUE_NAME = new OptionalParameter<>(); + public static final OptionalParameter<Integer> YARN_PRIORITY = new OptionalParameter<>(); + public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>(); + + /** Configuration provides whose Configuration will be merged into all Driver Configuration. */ + public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); + + public static final ConfigurationModule CONF = new UnmanagedAmYarnClientConfiguration() + .merge(CommonRuntimeConfiguration.CONF) + .bindNamedParameter(DriverIsUnmanaged.class, "true") + // Bind YARN + .bindImplementation(JobSubmissionHandler.class, UnmanagedAmYarnJobSubmissionHandler.class) + .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class) + // Bind the parameters given by the user + .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME) + .bindNamedParameter(JobPriority.class, YARN_PRIORITY) + .bindNamedParameter(RootFolder.class, ROOT_FOLDER) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors + .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) + .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java new file mode 100644 index 0000000..2a57a97 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.driver.parameters.DriverIsUnmanaged; +import org.apache.reef.io.TempFileCreator; +import org.apache.reef.io.WorkingDirectoryTempFileCreator; +import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; +import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout; +import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.common.launch.REEFErrorHandler; +import org.apache.reef.runtime.common.launch.REEFMessageCodec; +import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; +import org.apache.reef.runtime.common.launch.parameters.LaunchID; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.yarn.YarnClasspathProvider; +import org.apache.reef.runtime.yarn.driver.*; +import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; +import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; +import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.formats.*; +import org.apache.reef.wake.remote.RemoteConfiguration; + +/** + * Build configuration for REEF driver running in unmanaged mode under the YARN resource manager. + */ +@Public +@DriverSide +public final class UnmanagedAmYarnDriverConfiguration extends ConfigurationModuleBuilder { + + public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>(); + public static final RequiredParameter<String> JOB_SUBMISSION_DIRECTORY = new RequiredParameter<>(); + + public static final OptionalParameter<Integer> YARN_HEARTBEAT_INTERVAL = new OptionalParameter<>(); + public static final OptionalImpl<RackNameFormatter> RACK_NAME_FORMATTER = new OptionalImpl<>(); + public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>(); + public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>(); + public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>(); + + public static final ConfigurationModule CONF = + new UnmanagedAmYarnDriverConfiguration() + .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER) + .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER) + .bindNamedParameter(JobSubmissionDirectory.class, JOB_SUBMISSION_DIRECTORY) + // REEF client parameters + .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_UNMANAGED_DRIVER") + .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) + .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + // YARN runtime + .bindNamedParameter(DriverIsUnmanaged.class, "true") + .bindImplementation(ResourceLaunchHandler.class, YARNResourceLaunchHandler.class) + .bindImplementation(ResourceReleaseHandler.class, YARNResourceReleaseHandler.class) + .bindImplementation(ResourceRequestHandler.class, YarnResourceRequestHandler.class) + .bindImplementation(ResourceManagerStartHandler.class, YARNRuntimeStartHandler.class) + .bindImplementation(ResourceManagerStopHandler.class, YARNRuntimeStopHandler.class) + .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class) + .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class) + .bindNamedParameter(YarnHeartbeatPeriod.class, YARN_HEARTBEAT_INTERVAL) + // AbstractDriverRuntime parameters + .bindNamedParameter(EvaluatorTimeout.class, EVALUATOR_TIMEOUT) + .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER) + .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER) + .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) + .bindImplementation(RackNameFormatter.class, RACK_NAME_FORMATTER) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindNamedParameter(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME) + .build(); + + /** Cannot instantiate this utility class. */ + private UnmanagedAmYarnDriverConfiguration() { } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java new file mode 100644 index 0000000..d75a2b5 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; +import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; +import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; +import org.apache.reef.runtime.yarn.client.parameters.JobQueue; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Private +@ClientSide +final class UnmanagedAmYarnJobSubmissionHandler implements JobSubmissionHandler { + + private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnJobSubmissionHandler.class.getName()); + + private final String defaultQueueName; + private final UnmanagedDriverFiles driverFiles; + private final UnmanagedAmYarnSubmissionHelper submissionHelper; + + private String applicationId = null; + + @Inject + private UnmanagedAmYarnJobSubmissionHandler( + @Parameter(JobQueue.class) final String defaultQueueName, + final UnmanagedDriverFiles driverFiles, + final YarnConfiguration yarnConfiguration, + final SecurityTokenProvider tokenProvider) throws IOException { + + this.defaultQueueName = defaultQueueName; + this.driverFiles = driverFiles; + + try { + this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, tokenProvider); + } catch (final IOException | YarnException ex) { + LOG.log(Level.SEVERE, "Cannot create YARN client", ex); + throw new RuntimeException("Cannot create YARN client", ex); + } + } + + @Override + public void close() { + this.submissionHelper.close(); + } + + @Override + public void onNext(final JobSubmissionEvent jobSubmissionEvent) { + + final String jobId = jobSubmissionEvent.getIdentifier(); + LOG.log(Level.FINEST, "Submitting UNMANAGED AM job: {0}", jobSubmissionEvent); + + try { + this.driverFiles.copyGlobalsFrom(jobSubmissionEvent); + + this.submissionHelper + .setApplicationName(jobId) + .setPriority(jobSubmissionEvent.getPriority().orElse(0)) + .setQueue(getQueue(jobSubmissionEvent)) + .submit(); + + this.applicationId = this.submissionHelper.getStringApplicationId(); + LOG.log(Level.FINER, "Submitted UNMANAGED AM job with ID {0} :: {1}", new String[] {jobId, this.applicationId}); + + } catch (final IOException | YarnException ex) { + throw new RuntimeException("Unable to submit UNMANAGED Driver to YARN: " + jobId, ex); + } + } + + /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + @Override + public String getApplicationId() { + return this.applicationId; + } + + /** + * Extract the queue name from the jobSubmissionEvent or return default if none is set. + */ + private String getQueue(final JobSubmissionEvent jobSubmissionEvent) { + try { + return Tang.Factory.getTang().newInjector( + jobSubmissionEvent.getConfiguration()).getNamedInstance(JobQueue.class); + } catch (final InjectionException e) { + return this.defaultQueueName; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java new file mode 100644 index 0000000..d25dbad --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; +import org.apache.reef.runtime.yarn.client.UserCredentialSecurityTokenProvider; +import org.apache.reef.runtime.yarn.util.YarnTypes; + +import java.io.IOException; +import java.util.Collections; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Helper code that wraps the YARN Client API for our purposes. + */ +final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable { + + private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnSubmissionHelper.class.getName()); + + private final SecurityTokenProvider tokenProvider; + private final YarnClient yarnClient; + private final ApplicationSubmissionContext applicationSubmissionContext; + private final ApplicationId applicationId; + + UnmanagedAmYarnSubmissionHelper(final YarnConfiguration yarnConfiguration, + final SecurityTokenProvider tokenProvider) throws IOException, YarnException { + + this.tokenProvider = tokenProvider; + + LOG.log(Level.FINE, "Initializing YARN Client"); + this.yarnClient = YarnClient.createYarnClient(); + this.yarnClient.init(yarnConfiguration); + this.yarnClient.start(); + LOG.log(Level.FINE, "Initialized YARN Client"); + + LOG.log(Level.FINE, "Requesting UNMANAGED Application ID from YARN."); + + final ContainerLaunchContext launchContext = YarnTypes.getContainerLaunchContext( + Collections.<String>emptyList(), Collections.<String, LocalResource>emptyMap(), tokenProvider.getTokens()); + + final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication(); + + this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext(); + this.applicationSubmissionContext.setAMContainerSpec(launchContext); + this.applicationSubmissionContext.setUnmanagedAM(true); + + this.applicationId = this.applicationSubmissionContext.getApplicationId(); + + LOG.log(Level.INFO, "YARN UNMANAGED Application ID: {0}", this.applicationId); + } + + /** + * @return the application ID assigned by YARN. + */ + String getStringApplicationId() { + return this.applicationId.toString(); + } + + /** + * Set the name of the application to be submitted. + * @param applicationName YARN application name - a human-readable string. + * @return reference to self for chain calls. + */ + UnmanagedAmYarnSubmissionHelper setApplicationName(final String applicationName) { + this.applicationSubmissionContext.setApplicationName(applicationName); + return this; + } + + /** + * Set the priority of the job. + * @param priority YARN application priority. + * @return reference to self for chain calls. + */ + UnmanagedAmYarnSubmissionHelper setPriority(final int priority) { + this.applicationSubmissionContext.setPriority(Priority.newInstance(priority)); + return this; + } + + /** + * Assign this job submission to a queue. + * @param queueName YARN queue name. + * @return reference to self for chain calls. + */ + UnmanagedAmYarnSubmissionHelper setQueue(final String queueName) { + this.applicationSubmissionContext.setQueue(queueName); + return this; + } + + void submit() throws IOException, YarnException { + + LOG.log(Level.INFO, "Submitting REEF Application with UNMANAGED AM to YARN. ID: {0}", this.applicationId); + this.yarnClient.submitApplication(this.applicationSubmissionContext); + + final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId); + this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token)); + } + + @Override + public void close() { + + if (LOG.isLoggable(Level.FINER)) { + try { + final ApplicationReport appReport = this.yarnClient.getApplicationReport(this.applicationId); + LOG.log(Level.FINER, "Application {0} final attempt {1} status: {2}/{3}", new Object[] { + this.applicationId, appReport.getCurrentApplicationAttemptId(), + appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus() }); + } catch (final IOException | YarnException ex) { + LOG.log(Level.WARNING, "Cannot get final status of Unmanaged AM app: " + this.applicationId, ex); + } + } + + LOG.log(Level.FINE, "Closing Unmanaged AM YARN application: {0}", this.applicationId); + this.yarnClient.stop(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java new file mode 100644 index 0000000..e573a61 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.client.parameters.RootFolder; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Represents the files added to a driver. + */ +final class UnmanagedDriverFiles { + + private static final Logger LOG = Logger.getLogger(UnmanagedDriverFiles.class.getName()); + + private final String rootFolderName; + private final REEFFileNames fileNames; + + @Inject + private UnmanagedDriverFiles( + @Parameter(RootFolder.class) final String rootFolderName, + final REEFFileNames fileNames) { + + this.rootFolderName = rootFolderName; + this.fileNames = fileNames; + } + + public void copyGlobalsFrom(final JobSubmissionEvent jobSubmissionEvent) throws IOException { + + final File reefGlobalPath = new File(this.rootFolderName, this.fileNames.getGlobalFolderPath()); + if (!reefGlobalPath.exists() && !reefGlobalPath.mkdirs()) { + LOG.log(Level.WARNING, "Failed to create directory: {0}", reefGlobalPath); + throw new RuntimeException("Failed to create directory: " + reefGlobalPath); + } + + reefGlobalPath.deleteOnExit(); + + for (final FileResource fileResource : jobSubmissionEvent.getGlobalFileSet()) { + + final File sourceFile = new File(fileResource.getPath()); + final File destinationFile = new File(reefGlobalPath, sourceFile.getName()); + LOG.log(Level.FINEST, "Copy file: {0} -> {1}", new Object[] {sourceFile, destinationFile}); + + try { + Files.createSymbolicLink(destinationFile.toPath(), sourceFile.toPath()); + } catch (final IOException ex) { + LOG.log(Level.FINER, "Can't symlink file " + sourceFile + ", copying instead.", ex); + Files.copy(sourceFile.toPath(), destinationFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java new file mode 100644 index 0000000..ec6f0d8 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Client-side event handlers for YARN resourcemanager and AM running in unmanaged mode. + */ +package org.apache.reef.runtime.yarn.client.unmanaged;
