http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs index bcff6d2..83e5ff1 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs @@ -16,11 +16,15 @@ // under the License. using System; +using System.Collections.Generic; using System.IO; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN.RestClient.DataModel; +using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.IO.FileSystem; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Client.YARN.RestClient @@ -36,41 +40,72 @@ namespace Org.Apache.REEF.Client.YARN.RestClient private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0); private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator; private readonly IFileSystem _fileSystem; + private readonly REEFFileNames _reefFileNames; + private readonly IFile _file; [Inject] private FileSystemJobResourceUploader( IResourceArchiveFileGenerator resourceArchiveFileGenerator, - IFileSystem fileSystem) + IFileSystem fileSystem, + REEFFileNames reefFileNames, + IFile file) { _fileSystem = fileSystem; _resourceArchiveFileGenerator = resourceArchiveFileGenerator; + _reefFileNames = reefFileNames; + _file = file; } - public JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory) + public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath) { driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; - var driverUploadPath = jobSubmissionDirectory.TrimEnd('/') + @"/"; + var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; + var parentDirectoryUri = _fileSystem.CreateUriForPath(remoteUploadDirectoryPath); Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath); - var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); + + _fileSystem.CreateDirectory(parentDirectoryUri); - var destinationPath = driverUploadPath + Path.GetFileName(archivePath); - var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath); - Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, remoteFileUri); + var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); + return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath); + } + public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath) + { + var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath); + _fileSystem.CreateDirectory(parentDirectoryUri); - _fileSystem.CopyFromLocal(archivePath, remoteFileUri); + return GetJobResource(fileLocalPath, ResourceType.FILE, remoteUploadDirectoryPath); + } + + private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath) + { + if (!_file.Exists(filePath)) + { + Exceptions.Throw( + new FileNotFoundException("Could not find resource file " + filePath), + Log); + } + + var destinationPath = driverUploadPath + Path.GetFileName(filePath); + var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath); + + Log.Log(Level.Verbose, @"Copy {0} to {1}", filePath, remoteFileUri); + + _fileSystem.CopyFromLocal(filePath, remoteFileUri); var fileStatus = _fileSystem.GetFileStatus(remoteFileUri); return new JobResource { + Name = Path.GetFileNameWithoutExtension(filePath), LastModificationUnixTimestamp = DateTimeToUnixTimestamp(fileStatus.ModificationTime), RemoteUploadPath = remoteFileUri.AbsoluteUri, - ResourceSize = fileStatus.LengthBytes + ResourceSize = fileStatus.LengthBytes, + ResourceType = resourceType }; } - private long DateTimeToUnixTimestamp(DateTime dateTime) + private static long DateTimeToUnixTimestamp(DateTime dateTime) { return (long)(dateTime - Epoch).TotalSeconds; }
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs index 6a14c5c..21e3f7d 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs @@ -91,11 +91,12 @@ namespace Org.Apache.REEF.Client.YARN } sb.Append(" " + LauncherClassName); + sb.Append(" " + _fileNames.GetJobSubmissionParametersFile()); sb.Append(" " + string.Format("{0}/{1}/{2}", _fileNames.GetReefFolderName(), _fileNames.GetLocalFolderName(), - _fileNames.GetJobSubmissionParametersFile())); + _fileNames.GetAppSubmissionParametersFile())); sb.Append(" " + _fileNames.GetDriverLoggingConfigCommand()); return sb.ToString(); } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs index ce228ef..81912ea 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs @@ -21,20 +21,14 @@ using System.IO; using System.Linq; using System.Threading.Tasks; using Org.Apache.REEF.Client.API; -using Org.Apache.REEF.Client.Avro; -using Org.Apache.REEF.Client.Avro.YARN; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn.RestClient; using Org.Apache.REEF.Client.YARN; -using Org.Apache.REEF.Client.YARN.Parameters; using Org.Apache.REEF.Client.YARN.RestClient.DataModel; -using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Files; -using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.Yarn { @@ -48,11 +42,9 @@ namespace Org.Apache.REEF.Client.Yarn private static readonly Logger Logger = Logger.GetLogger(typeof(YarnREEFClient)); private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper; private readonly IJavaClientLauncher _javaClientLauncher; - private readonly string _securityTokenKind; - private readonly string _securityTokenService; - private readonly string _jobSubmissionPrefix; private readonly REEFFileNames _fileNames; private readonly IYarnRMClient _yarnClient; + private readonly YarnREEFParamSerializer _paramSerializer; [Inject] internal YarnREEFClient(IJavaClientLauncher javaClientLauncher, @@ -60,18 +52,14 @@ namespace Org.Apache.REEF.Client.Yarn REEFFileNames fileNames, YarnCommandLineEnvironment yarn, IYarnRMClient yarnClient, - [Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind, - [Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService, - [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix) + YarnREEFParamSerializer paramSerializer) { - _jobSubmissionPrefix = jobSubmissionPrefix; - _securityTokenKind = securityTokenKind; - _securityTokenService = securityTokenService; _javaClientLauncher = javaClientLauncher; _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList()); _driverFolderPreparationHelper = driverFolderPreparationHelper; _fileNames = fileNames; _yarnClient = yarnClient; + _paramSerializer = paramSerializer; } public void Submit(IJobSubmission jobSubmission) @@ -123,41 +111,12 @@ namespace Org.Apache.REEF.Client.Yarn // TODO: Remove this when we have a generalized way to pass config to java var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); - - var avroJobSubmissionParameters = new AvroJobSubmissionParameters - { - jobId = jobSubmission.JobIdentifier, - tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(), - tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(), - tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(), - jobSubmissionFolder = driverFolderPath - }; - - var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters - { - driverMemory = jobSubmission.DriverMemory, - driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(), - jobSubmissionDirectoryPrefix = _jobSubmissionPrefix, - sharedJobSubmissionParameters = avroJobSubmissionParameters - }; - - var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters - { - maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>(), - securityTokenKind = _securityTokenKind, - securityTokenService = _securityTokenService, - yarnJobSubmissionParameters = avroYarnJobSubmissionParameters - }; - - var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile()); - using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) - { - var serializedArgs = AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters); - argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); - } + + var submissionJobArgsFilePath = _paramSerializer.SerializeJobFile(jobSubmission, driverFolderPath); + var submissionAppArgsFilePath = _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, driverFolderPath); // Submit the driver - _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath); + _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath); Logger.Log(Level.Info, "Submitted the Driver for execution." + jobSubmission.JobIdentifier); } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs index 89dad2b..33ab5b9 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs @@ -16,24 +16,21 @@ // under the License. using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Org.Apache.REEF.Client.API; -using Org.Apache.REEF.Client.Avro; -using Org.Apache.REEF.Client.Avro.YARN; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; using Org.Apache.REEF.Client.Yarn.RestClient; using Org.Apache.REEF.Client.YARN.RestClient.DataModel; -using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.YARN { @@ -53,6 +50,7 @@ namespace Org.Apache.REEF.Client.YARN private readonly IYarnJobCommandProvider _yarnJobCommandProvider; private readonly REEFFileNames _fileNames; private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider; + private readonly YarnREEFDotNetParamSerializer _paramSerializer; [Inject] private YarnREEFDotNetClient( @@ -61,7 +59,8 @@ namespace Org.Apache.REEF.Client.YARN IJobResourceUploader jobResourceUploader, IYarnJobCommandProvider yarnJobCommandProvider, REEFFileNames fileNames, - IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider) + IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider, + YarnREEFDotNetParamSerializer paramSerializer) { _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider; _fileNames = fileNames; @@ -69,6 +68,7 @@ namespace Org.Apache.REEF.Client.YARN _jobResourceUploader = jobResourceUploader; _driverFolderPreparationHelper = driverFolderPreparationHelper; _yarnRMClient = yarnRMClient; + _paramSerializer = paramSerializer; } public void Submit(IJobSubmission jobSubmission) @@ -94,40 +94,21 @@ namespace Org.Apache.REEF.Client.YARN // prepare configuration var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); - int maxApplicationSubmissions = + var maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>(); - var avroJobSubmissionParameters = new AvroJobSubmissionParameters - { - jobId = jobSubmission.JobIdentifier, - tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(), - tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(), - tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(), - jobSubmissionFolder = localDriverFolderPath - }; - - var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters - { - driverMemory = jobSubmission.DriverMemory, - driverRecoveryTimeout = - paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(), - jobSubmissionDirectoryPrefix = jobSubmissionDirectory, - dfsJobSubmissionFolder = jobSubmissionDirectory, - sharedJobSubmissionParameters = avroJobSubmissionParameters - }; - - var submissionArgsFilePath = Path.Combine(localDriverFolderPath, - _fileNames.GetLocalFolderPath(), - _fileNames.GetJobSubmissionParametersFile()); - using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) - { - var serializedArgs = - AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters); - argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); - } + _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, localDriverFolderPath); + _paramSerializer.SerializeJobFile(jobSubmission, localDriverFolderPath, jobSubmissionDirectory); + + var archiveResource = _jobResourceUploader.UploadArchiveResource(localDriverFolderPath, jobSubmissionDirectory); + + // Path to the job args file. + var jobArgsFilePath = Path.Combine(localDriverFolderPath, _fileNames.GetJobSubmissionParametersFile()); + + var argFileResource = _jobResourceUploader.UploadFileResource(jobArgsFilePath, jobSubmissionDirectory); // upload prepared folder to DFS - var jobResource = _jobResourceUploader.UploadJobResource(localDriverFolderPath, jobSubmissionDirectory); + var jobResources = new List<JobResource> { archiveResource, argFileResource }; // submit job Log.Log(Level.Verbose, @"Assigned application id {0}", applicationId); @@ -135,7 +116,7 @@ namespace Org.Apache.REEF.Client.YARN var submissionReq = CreateApplicationSubmissionRequest(jobSubmission, applicationId, maxApplicationSubmissions, - jobResource); + jobResources); var submittedApplication = _yarnRMClient.SubmitApplicationAsync(submissionReq).GetAwaiter().GetResult(); Log.Log(Level.Info, @"Submitted application {0}", submittedApplication.Id); } @@ -163,13 +144,16 @@ namespace Org.Apache.REEF.Client.YARN IJobSubmission jobSubmission, string appId, int maxApplicationSubmissions, - JobResource jobResource) + IReadOnlyCollection<JobResource> jobResources) { string command = _yarnJobCommandProvider.GetJobSubmissionCommand(); Log.Log(Level.Verbose, "Command for YARN: {0}", command); Log.Log(Level.Verbose, "ApplicationID: {0}", appId); Log.Log(Level.Verbose, "MaxApplicationSubmissions: {0}", maxApplicationSubmissions); - Log.Log(Level.Verbose, "Driver archive location: {0}", jobResource.RemoteUploadPath); + foreach (var jobResource in jobResources) + { + Log.Log(Level.Verbose, "Remote file: {0}", jobResource.RemoteUploadPath); + } var submitApplication = new SubmitApplication { @@ -188,24 +172,7 @@ namespace Org.Apache.REEF.Client.YARN UnmanagedAM = false, AmContainerSpec = new AmContainerSpec { - LocalResources = new LocalResources - { - Entries = new[] - { - new KeyValuePair<string, LocalResourcesValue> - { - Key = _fileNames.GetReefFolderName(), - Value = new LocalResourcesValue - { - Resource = jobResource.RemoteUploadPath, - Type = ResourceType.ARCHIVE, - Visibility = Visibility.APPLICATION, - Size = jobResource.ResourceSize, - Timestamp = jobResource.LastModificationUnixTimestamp - } - } - } - }, + LocalResources = CreateLocalResources(jobResources), Commands = new Commands { Command = command @@ -216,11 +183,30 @@ namespace Org.Apache.REEF.Client.YARN return submitApplication; } + private static LocalResources CreateLocalResources(IEnumerable<JobResource> jobResources) + { + return new LocalResources + { + Entries = jobResources.Select(jobResource => new RestClient.DataModel.KeyValuePair<string, LocalResourcesValue> + { + Key = jobResource.Name, + Value = new LocalResourcesValue + { + Resource = jobResource.RemoteUploadPath, + Type = jobResource.ResourceType, + Visibility = Visibility.APPLICATION, + Size = jobResource.ResourceSize, + Timestamp = jobResource.LastModificationUnixTimestamp + } + }).ToArray() + }; + } + /// <summary> /// Creates the temporary directory to hold the job submission. /// </summary> /// <returns>The path to the folder created.</returns> - private string CreateDriverFolder(string jobId, string appId) + private static string CreateDriverFolder(string jobId, string appId) { return Path.GetFullPath(Path.Combine(Path.GetTempPath(), string.Join("-", "reef", jobId, appId))); } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs new file mode 100644 index 0000000..dd28702 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Avro; +using Org.Apache.REEF.Client.Avro.YARN; +using Org.Apache.REEF.Common.Avro; +using Org.Apache.REEF.Common.Files; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Client.YARN +{ + /// <summary> + /// Job/application parameter file serializer for <see cref="YarnREEFDotNetClient"/>. + /// </summary> + internal sealed class YarnREEFDotNetParamSerializer + { + private readonly REEFFileNames _fileNames; + + [Inject] + private YarnREEFDotNetParamSerializer(REEFFileNames fileNames) + { + _fileNames = fileNames; + } + + /// <summary> + /// Serializes the application parameters to reef/local/app-submission-params.json. + /// </summary> + internal void SerializeAppFile(IJobSubmission jobSubmission, IInjector paramInjector, string localDriverFolderPath) + { + var serializedArgs = SerializeAppArgsToBytes(jobSubmission, paramInjector, localDriverFolderPath); + + var submissionAppArgsFilePath = Path.Combine( + localDriverFolderPath, _fileNames.GetLocalFolderPath(), _fileNames.GetAppSubmissionParametersFile()); + + using (var jobArgsFileStream = new FileStream(submissionAppArgsFilePath, FileMode.CreateNew)) + { + jobArgsFileStream.Write(serializedArgs, 0, serializedArgs.Length); + } + } + + internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, IInjector paramInjector, string localDriverFolderPath) + { + var avroAppSubmissionParameters = new AvroAppSubmissionParameters + { + tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(), + tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(), + tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>() + }; + + var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters + { + sharedAppSubmissionParameters = avroAppSubmissionParameters, + driverMemory = jobSubmission.DriverMemory, + driverRecoveryTimeout = + paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(), + }; + + return AvroJsonSerializer<AvroYarnAppSubmissionParameters>.ToBytes(avroYarnAppSubmissionParameters); + } + + /// <summary> + /// Serializes the job parameters to job-submission-params.json. + /// </summary> + internal void SerializeJobFile(IJobSubmission jobSubmission, string localDriverFolderPath, string jobSubmissionDirectory) + { + var serializedArgs = SerializeJobArgsToBytes(jobSubmission, localDriverFolderPath, jobSubmissionDirectory); + + var submissionJobArgsFilePath = Path.Combine(localDriverFolderPath, + _fileNames.GetJobSubmissionParametersFile()); + + using (var jobArgsFileStream = new FileStream(submissionJobArgsFilePath, FileMode.CreateNew)) + { + jobArgsFileStream.Write(serializedArgs, 0, serializedArgs.Length); + } + } + + internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, string localDriverFolderPath, string jobSubmissionDirectory) + { + var avroJobSubmissionParameters = new AvroJobSubmissionParameters + { + jobId = jobSubmission.JobIdentifier, + jobSubmissionFolder = localDriverFolderPath + }; + + var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters + { + jobSubmissionDirectoryPrefix = jobSubmissionDirectory, + dfsJobSubmissionFolder = jobSubmissionDirectory, + sharedJobSubmissionParameters = avroJobSubmissionParameters + }; + + return AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs new file mode 100644 index 0000000..e2278e4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Avro; +using Org.Apache.REEF.Client.Avro.YARN; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN.Parameters; +using Org.Apache.REEF.Common.Avro; +using Org.Apache.REEF.Common.Files; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Client.YARN +{ + /// <summary> + /// Job/application parameter file serializer for <see cref="YarnREEFClient"/>. + /// </summary> + internal sealed class YarnREEFParamSerializer + { + private readonly REEFFileNames _fileNames; + private readonly string _securityTokenKind; + private readonly string _securityTokenService; + private readonly string _jobSubmissionPrefix; + + [Inject] + private YarnREEFParamSerializer( + REEFFileNames fileNames, + [Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind, + [Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService, + [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix) + { + _fileNames = fileNames; + _jobSubmissionPrefix = jobSubmissionPrefix; + _securityTokenKind = securityTokenKind; + _securityTokenService = securityTokenService; + } + + /// <summary> + /// Serializes the application parameters to reef/local/app-submission-params.json. + /// </summary> + internal string SerializeAppFile(IJobSubmission jobSubmission, IInjector paramInjector, string driverFolderPath) + { + var serializedArgs = SerializeAppArgsToBytes(jobSubmission, paramInjector); + + var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetAppSubmissionParametersFile()); + using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) + { + argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); + } + + return submissionArgsFilePath; + } + + internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, IInjector paramInjector) + { + var avroAppSubmissionParameters = new AvroAppSubmissionParameters + { + tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(), + tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(), + tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>() + }; + + var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters + { + sharedAppSubmissionParameters = avroAppSubmissionParameters, + driverMemory = jobSubmission.DriverMemory, + driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>() + }; + + var avroYarnClusterAppSubmissionParameters = new AvroYarnClusterAppSubmissionParameters + { + yarnAppSubmissionParameters = avroYarnAppSubmissionParameters, + maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>() + }; + + return AvroJsonSerializer<AvroYarnClusterAppSubmissionParameters>.ToBytes(avroYarnClusterAppSubmissionParameters); + } + + /// <summary> + /// Serializes the job parameters to job-submission-params.json. + /// </summary> + internal string SerializeJobFile(IJobSubmission jobSubmission, string driverFolderPath) + { + var serializedArgs = SerializeJobArgsToBytes(jobSubmission, driverFolderPath); + + var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile()); + using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) + { + argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); + } + + return submissionArgsFilePath; + } + + internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, string driverFolderPath) + { + var avroJobSubmissionParameters = new AvroJobSubmissionParameters + { + jobId = jobSubmission.JobIdentifier, + jobSubmissionFolder = driverFolderPath + }; + + var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters + { + jobSubmissionDirectoryPrefix = _jobSubmissionPrefix, + sharedJobSubmissionParameters = avroJobSubmissionParameters + }; + + var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters + { + securityTokenKind = _securityTokenKind, + securityTokenService = _securityTokenService, + yarnJobSubmissionParameters = avroYarnJobSubmissionParameters + }; + + return AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs index 321342b..94e2e6b 100644 --- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs +++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs @@ -53,6 +53,7 @@ namespace Org.Apache.REEF.Common.Files private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config"; private const string SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId"; private const string SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd"; + private const string APP_SUBMISSION_PARAMETERS_FILE = "app-submission-params.json"; private const string JOB_SUBMISSION_PARAMETERS_FILE = "job-submission-params.json"; private const string DRIVER_COMMAND_LOGGING_CONFIG = "1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; @@ -243,7 +244,16 @@ namespace Org.Apache.REEF.Common.Files } /// <summary> - /// The Job Submission parameters file that is used to submit a job through Java, + /// The Job Submission application parameters file that is used to submit a job through Java, + /// either directly or via a "bootstrap" method. + /// </summary> + public string GetAppSubmissionParametersFile() + { + return APP_SUBMISSION_PARAMETERS_FILE; + } + + /// <summary> + /// The Job Submission job parameters file that is used to submit a job through Java, /// either directly or via a "bootstrap" method. /// </summary> public string GetJobSubmissionParametersFile() http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc new file mode 100644 index 0000000..30205e1 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +[ + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", + "name": "AvroAppSubmissionParameters", + "doc": "General cross-language application submission parameters shared by all runtimes", + "fields": [ + { "name": "tcpBeginPort", "type": "int" }, + { "name": "tcpRangeCount", "type": "int" }, + { "name": "tcpTryCount", "type": "int" } + ] + }, + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", + "name": "AvroLocalAppSubmissionParameters", + "doc": "Cross-language application submission parameters to the Local runtime", + "fields": [ + { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" }, + { "name": "maxNumberOfConcurrentEvaluators", "type": "int" } + ] + }, + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", + "name": "AvroYarnAppSubmissionParameters", + "doc": "General cross-language application submission parameters to the YARN runtime", + "fields": [ + { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" }, + { "name": "driverMemory", "type": "int" }, + { "name": "driverRecoveryTimeout", "type": "int" } + ] + }, + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", + "name": "AvroYarnClusterAppSubmissionParameters", + "doc": "Cross-language application submission parameters to the YARN runtime using Hadoop's submission client", + "fields": [ + { "name": "yarnAppSubmissionParameters", "type": "AvroYarnAppSubmissionParameters" }, + { "name": "maxApplicationSubmissions", "type": "int" } + ] + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc index fec2431..61b9812 100644 --- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc @@ -21,34 +21,19 @@ "namespace": "org.apache.reef.reef.bridge.client.avro", "type": "record", "name": "AvroJobSubmissionParameters", - "doc": "General cross-language submission parameters shared by all runtimes", + "doc": "General cross-language job submission parameters shared by all runtimes", "fields": [ { "name": "jobId", "type": "string" }, - { "name": "tcpBeginPort", "type": "int" }, - { "name": "tcpRangeCount", "type": "int" }, - { "name": "tcpTryCount", "type": "int" }, { "name": "jobSubmissionFolder", "type": "string" } ] }, { "namespace": "org.apache.reef.reef.bridge.client.avro", "type": "record", - "name": "AvroLocalJobSubmissionParameters", - "doc": "Cross-language submission parameters to the Local runtime", - "fields": [ - { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" }, - { "name": "maxNumberOfConcurrentEvaluators", "type": "int" } - ] - }, - { - "namespace": "org.apache.reef.reef.bridge.client.avro", - "type": "record", "name": "AvroYarnJobSubmissionParameters", "doc": "General cross-language submission parameters to the YARN runtime", "fields": [ { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" }, - { "name": "driverMemory", "type": "int" }, - { "name": "driverRecoveryTimeout", "type": "int" }, { "name": "dfsJobSubmissionFolder", "type": "string", "default": "NULL" }, { "name": "jobSubmissionDirectoryPrefix", "type": "string" } ] @@ -60,7 +45,6 @@ "doc": "Cross-language submission parameters to the YARN runtime using Hadoop's submission client", "fields": [ { "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" }, - { "name": "maxApplicationSubmissions", "type": "int" }, { "name": "securityTokenKind", "type": "string", "default": "NULL" }, { "name": "securityTokenService", "type": "string", "default": "NULL" } ] http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java index 6ad77c7..e03013a 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java @@ -20,6 +20,7 @@ package org.apache.reef.bridge.client; import org.apache.commons.lang.Validate; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.URL; import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; import org.apache.reef.runtime.yarn.YarnClasspathProvider; @@ -44,16 +45,18 @@ public final class JobResourceUploader { /** * This class is invoked from Org.Apache.REEF.Client.Yarn.LegacyJobResourceUploader in .NET code. * Arguments: - * [0] : Local path for already generated archive - * [1] : Path of job submission directory - * [2] : File path for output with details of uploaded resource + * [0] : Local path for file. + * [1] : Type for file. + * [2] : Path of job submission directory + * [3] : File path for output with details of uploaded resource */ public static void main(final String[] args) throws InjectionException, IOException { - Validate.isTrue(args.length == 3, "Job resource uploader requires 3 args"); + Validate.isTrue(args.length == 4, "Job resource uploader requires 4 args"); final File localFile = new File(args[0]); - Validate.isTrue(localFile.exists(), "Local archive does not exist " + localFile.getAbsolutePath()); - final String jobSubmissionDirectory = args[1]; - final String localOutputPath = args[2]; + Validate.isTrue(localFile.exists(), "Local file does not exist " + localFile.getAbsolutePath()); + final String fileType = args[1]; + final String jobSubmissionDirectory = args[2]; + final String localOutputPath = args[3]; LOG.log(Level.INFO, "Received args: LocalPath " + localFile.getAbsolutePath() + " Submission directory " + jobSubmissionDirectory + " LocalOutputPath " + localOutputPath); @@ -66,7 +69,7 @@ public final class JobResourceUploader { .newInjector(configuration) .getInstance(JobUploader.class); final LocalResource localResource = jobUploader.createJobFolder(jobSubmissionDirectory) - .uploadAsLocalResource(localFile); + .uploadAsLocalResource(localFile, LocalResourceType.valueOf(fileType)); // Output: <UploadedPath>;<LastModificationUnixTimestamp>;<ResourceSize> final URL resource = localResource.getResource(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java index cc308ab..3c94acc 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java @@ -61,12 +61,19 @@ public final class LocalClient { public static void main(final String[] args) throws IOException, InjectionException { final File jobSubmissionParametersFile = new File(args[0]); + final File localAppSubmissionParametersFile = new File(args[1]); + if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); } + if (!(localAppSubmissionParametersFile.exists() && localAppSubmissionParametersFile.canRead())) { + throw new IOException("Unable to open and read " + localAppSubmissionParametersFile.getAbsolutePath()); + } + final LocalSubmissionFromCS localSubmissionFromCS = - LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile); + LocalSubmissionFromCS.fromSubmissionParameterFiles( + jobSubmissionParametersFile, localAppSubmissionParametersFile); LOG.log(Level.INFO, "Local job submission received from C#: {0}", localSubmissionFromCS); final Configuration runtimeConfiguration = localSubmissionFromCS.getRuntimeConfiguration(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java index 93a8f2c..b63e20b 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java @@ -92,13 +92,20 @@ final class LocalRuntimeDriverConfigurationGenerator { } public static void main(final String[] args) throws InjectionException, IOException { - final File jobSubmissionParametersFile = new File(args[0]); + final File localAppSubmissionParametersFile = new File(args[0]); + final File jobSubmissionParametersFile = new File(args[1]); + if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); } + if (!(localAppSubmissionParametersFile.exists() && localAppSubmissionParametersFile.canRead())) { + throw new IOException("Unable to open and read " + localAppSubmissionParametersFile.getAbsolutePath()); + } + final LocalSubmissionFromCS localSubmission = - LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile); + LocalSubmissionFromCS.fromSubmissionParameterFiles( + jobSubmissionParametersFile, localAppSubmissionParametersFile); LOG.log(Level.FINE, "Local driver config generation received from C#: {0}", localSubmission); final Configuration localRuntimeConfiguration = localSubmission.getRuntimeConfiguration(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java index e8d8118..cc5bb82 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java @@ -24,8 +24,9 @@ import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.lang.Validate; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; @@ -58,16 +59,18 @@ final class LocalSubmissionFromCS { private final int tcpRangeCount; private final int tcpTryCount; - private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters avroLocalJobSubmissionParameters) { + private LocalSubmissionFromCS(final AvroJobSubmissionParameters avroJobSubmissionParameters, + final AvroLocalAppSubmissionParameters avroLocalAppSubmissionParameters) { // We assume the given path to be the one of the driver. The job folder is one level up from there. - final AvroJobSubmissionParameters jobSubmissionParameters = - avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters(); - this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString()); - this.jobId = jobSubmissionParameters.getJobId().toString(); - this.maxNumberOfConcurrentEvaluators = avroLocalJobSubmissionParameters.getMaxNumberOfConcurrentEvaluators(); - this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort(); - this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount(); - this.tcpTryCount = jobSubmissionParameters.getTcpTryCount(); + final AvroAppSubmissionParameters appSubmissionParameters = + avroLocalAppSubmissionParameters.getSharedAppSubmissionParameters(); + this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort(); + this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount(); + this.tcpTryCount = appSubmissionParameters.getTcpTryCount(); + this.maxNumberOfConcurrentEvaluators = avroLocalAppSubmissionParameters.getMaxNumberOfConcurrentEvaluators(); + + this.driverFolder = new File(avroJobSubmissionParameters.getJobSubmissionFolder().toString()); + this.jobId = avroJobSubmissionParameters.getJobId().toString(); this.jobFolder = driverFolder.getParentFile(); this.runtimeRootFolder = jobFolder.getParentFile(); @@ -137,16 +140,29 @@ final class LocalSubmissionFromCS { /** * Takes the local job submission configuration file, deserializes it, and creates submission object. */ - static LocalSubmissionFromCS fromJobSubmissionParametersFile(final File localJobSubmissionParametersFile) + static LocalSubmissionFromCS fromSubmissionParameterFiles(final File jobSubmissionParametersFile, + final File localAppSubmissionParametersFile) throws IOException { - try (final FileInputStream fileInputStream = new FileInputStream(localJobSubmissionParametersFile)) { + final AvroLocalAppSubmissionParameters localAppSubmissionParameters; + + final AvroJobSubmissionParameters jobSubmissionParameters; + + try (final FileInputStream fileInputStream = new FileInputStream(jobSubmissionParametersFile)) { final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( - AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream); - final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader = - new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class); - final AvroLocalJobSubmissionParameters localJobSubmissionParameters = reader.read(null, decoder); + AvroJobSubmissionParameters.getClassSchema(), fileInputStream); + final SpecificDatumReader<AvroJobSubmissionParameters> reader = + new SpecificDatumReader<>(AvroJobSubmissionParameters.class); + jobSubmissionParameters = reader.read(null, decoder); + } - return new LocalSubmissionFromCS(localJobSubmissionParameters); + try (final FileInputStream fileInputStream = new FileInputStream(localAppSubmissionParametersFile)) { + final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( + AvroLocalAppSubmissionParameters.getClassSchema(), fileInputStream); + final SpecificDatumReader<AvroLocalAppSubmissionParameters> reader = + new SpecificDatumReader<>(AvroLocalAppSubmissionParameters.class); + localAppSubmissionParameters = reader.read(null, decoder); } + + return new LocalSubmissionFromCS(jobSubmissionParameters, localAppSubmissionParameters); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java index 23f86ff..c2c5fd3 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java @@ -25,7 +25,9 @@ import org.apache.reef.client.DriverRestartConfiguration; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.javabridge.generic.JobDriver; +import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.REEFFileNames; @@ -63,31 +65,31 @@ final class YarnBootstrapDriverConfigGenerator { this.reefFileNames = reefFileNames; } - public String writeDriverConfigurationFile(final String bootstrapArgsLocation) throws IOException { - final File bootstrapArgsFile = new File(bootstrapArgsLocation); - final AvroYarnJobSubmissionParameters yarnBootstrapArgs = - readYarnJobSubmissionParametersFromFile(bootstrapArgsFile); + public String writeDriverConfigurationFile(final String bootstrapJobArgsLocation, + final String bootstrapAppArgsLocation) throws IOException { + final File bootstrapJobArgsFile = new File(bootstrapJobArgsLocation).getCanonicalFile(); + final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation); + + final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs = + readYarnJobSubmissionParametersFromFile(bootstrapJobArgsFile); + + final AvroYarnAppSubmissionParameters yarnBootstrapAppArgs = + readYarnAppSubmissionParametersFromFile(bootstrapAppArgsFile); + final String driverConfigPath = reefFileNames.getDriverConfigurationPath(); - this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapArgs), + this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapJobArgs, yarnBootstrapAppArgs), new File(driverConfigPath)); return driverConfigPath; } static Configuration getYarnDriverConfiguration( - final AvroYarnJobSubmissionParameters yarnJobSubmissionParams) { + final AvroYarnJobSubmissionParameters yarnJobSubmissionParams, + final AvroYarnAppSubmissionParameters yarnAppSubmissionParams) { + final AvroJobSubmissionParameters jobSubmissionParameters = yarnJobSubmissionParams.getSharedJobSubmissionParameters(); - final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) - .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(jobSubmissionParameters.getTcpBeginPort())) - .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(jobSubmissionParameters.getTcpRangeCount())) - .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(jobSubmissionParameters.getTcpTryCount())) - .bindNamedParameter(JobSubmissionDirectoryPrefix.class, - yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString()) - .build(); - final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString()) @@ -96,10 +98,21 @@ final class YarnBootstrapDriverConfigGenerator { .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0) .build(); + final AvroAppSubmissionParameters appSubmissionParams = yarnAppSubmissionParams.getSharedAppSubmissionParameters(); + + final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(appSubmissionParams.getTcpBeginPort())) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(appSubmissionParams.getTcpRangeCount())) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount())) + .bindNamedParameter(JobSubmissionDirectoryPrefix.class, + yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString()) + .build(); + final Configuration driverConfiguration = Configurations.merge( Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, yarnDriverConfiguration, providerConfig); - if (yarnJobSubmissionParams.getDriverRecoveryTimeout() > 0) { + if (yarnAppSubmissionParams.getDriverRecoveryTimeout() > 0) { LOG.log(Level.FINE, "Driver restart is enabled."); final Configuration yarnDriverRestartConfiguration = @@ -113,7 +126,7 @@ final class YarnBootstrapDriverConfigGenerator { .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, JobDriver.DriverRestartRunningTaskHandler.class) .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS, - yarnJobSubmissionParams.getDriverRecoveryTimeout()) + yarnAppSubmissionParams.getDriverRecoveryTimeout()) .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED, JobDriver.DriverRestartCompletedHandler.class) .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED, @@ -126,6 +139,23 @@ final class YarnBootstrapDriverConfigGenerator { return driverConfiguration; } + static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromFile(final File file) + throws IOException { + try (final FileInputStream fileInputStream = new FileInputStream(file)) { + // This is mainly a test hook. + return readYarnAppSubmissionParametersFromInputStream(fileInputStream); + } + } + + static AvroYarnAppSubmissionParameters readYarnAppSubmissionParametersFromInputStream( + final InputStream inputStream) throws IOException { + final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( + AvroYarnAppSubmissionParameters.getClassSchema(), inputStream); + final SpecificDatumReader<AvroYarnAppSubmissionParameters> reader = new SpecificDatumReader<>( + AvroYarnAppSubmissionParameters.class); + return reader.read(null, decoder); + } + static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromFile(final File file) throws IOException { try (final FileInputStream fileInputStream = new FileInputStream(file)) { http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java index 5f6b5c2..223c853 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java @@ -41,9 +41,20 @@ public final class YarnBootstrapREEFLauncher { public static void main(final String[] args) throws IOException, InjectionException { LOG.log(Level.INFO, "Entering BootstrapLauncher.main()."); - if (args.length != 1) { - final String message = "Bootstrap launcher should have a single configuration file input specifying the" + - " job submission parameters to be deserialized to create the YarnDriverConfiguration on the fly."; + if (args.length != 2) { + final StringBuilder sb = new StringBuilder(); + sb.append("[ "); + for (String arg : args) { + sb.append(arg); + sb.append(" "); + } + + sb.append("]"); + + final String message = "Bootstrap launcher should have two configuration file inputs, one specifying the" + + " application submission parameters to be deserialized and the other specifying the job" + + " submission parameters to be deserialized to create the YarnDriverConfiguration on the fly." + + " Current args are " + sb.toString(); throw fatal(message, new IllegalArgumentException(message)); } @@ -51,7 +62,7 @@ public final class YarnBootstrapREEFLauncher { try { final YarnBootstrapDriverConfigGenerator yarnDriverConfigurationGenerator = Tang.Factory.getTang().newInjector().getInstance(YarnBootstrapDriverConfigGenerator.class); - REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0])}); + REEFLauncher.main(new String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0], args[1])}); } catch (final Exception exception) { if (!(exception instanceof RuntimeException)) { throw fatal("Failed to initialize configurations.", exception); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java index 6783aa8..6fa3f83 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java @@ -22,9 +22,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.lang.Validate; -import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.*; import java.io.File; import java.io.FileInputStream; @@ -56,22 +54,29 @@ final class YarnClusterSubmissionFromCS { private final String tokenKind; private final String tokenService; private final String jobSubmissionDirectoryPrefix; + + private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters; private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters; - private YarnClusterSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) { + private YarnClusterSubmissionFromCS(final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters, + final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) { yarnJobSubmissionParameters = yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters(); + yarnAppSubmissionParameters = yarnClusterAppSubmissionParameters.getYarnAppSubmissionParameters(); final AvroJobSubmissionParameters jobSubmissionParameters = yarnJobSubmissionParameters.getSharedJobSubmissionParameters(); + final AvroAppSubmissionParameters appSubmissionParameters = + yarnAppSubmissionParameters.getSharedAppSubmissionParameters(); + this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString()); this.jobId = jobSubmissionParameters.getJobId().toString(); - this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort(); - this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount(); - this.tcpTryCount = jobSubmissionParameters.getTcpTryCount(); - this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions(); - this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout(); - this.driverMemory = yarnJobSubmissionParameters.getDriverMemory(); + this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort(); + this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount(); + this.tcpTryCount = appSubmissionParameters.getTcpTryCount(); + this.maxApplicationSubmissions = yarnClusterAppSubmissionParameters.getMaxApplicationSubmissions(); + this.driverRecoveryTimeout = yarnAppSubmissionParameters.getDriverRecoveryTimeout(); + this.driverMemory = yarnAppSubmissionParameters.getDriverMemory(); this.priority = DEFAULT_PRIORITY; this.queue = DEFAULT_QUEUE; this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString(); @@ -173,6 +178,13 @@ final class YarnClusterSubmissionFromCS { } /** + * @return The submission parameters for YARN applications. + */ + AvroYarnAppSubmissionParameters getYarnAppSubmissionParameters() { + return yarnAppSubmissionParameters; + } + + /** * @return The submission parameters for YARN jobs. */ AvroYarnJobSubmissionParameters getYarnJobSubmissionParameters() { @@ -189,21 +201,31 @@ final class YarnClusterSubmissionFromCS { /** * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object. */ - static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile) + static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterAppSubmissionParametersFile, + final File yarnClusterJobSubmissionParametersFile) throws IOException { - try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) { - // this is mainly a test hook - return readYarnClusterSubmissionFromCSFromInputStream(fileInputStream); + try (final FileInputStream appFileInputStream = new FileInputStream(yarnClusterAppSubmissionParametersFile)) { + try (final FileInputStream jobFileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) { + // this is mainly a test hook + return readYarnClusterSubmissionFromCSFromInputStream(appFileInputStream, jobFileInputStream); + } } } static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream( - final InputStream inputStream) throws IOException { - final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( - AvroYarnClusterJobSubmissionParameters.getClassSchema(), inputStream); - final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>( + final InputStream appInputStream, final InputStream jobInputStream) throws IOException { + final JsonDecoder appDecoder = DecoderFactory.get().jsonDecoder( + AvroYarnClusterAppSubmissionParameters.getClassSchema(), appInputStream); + final SpecificDatumReader<AvroYarnClusterAppSubmissionParameters> appReader = new SpecificDatumReader<>( + AvroYarnClusterAppSubmissionParameters.class); + final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder); + + final JsonDecoder jobDecoder = DecoderFactory.get().jsonDecoder( + AvroYarnClusterJobSubmissionParameters.getClassSchema(), jobInputStream); + final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> jobReader = new SpecificDatumReader<>( AvroYarnClusterJobSubmissionParameters.class); - final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder); - return new YarnClusterSubmissionFromCS(yarnClusterJobSubmissionParameters); + final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = jobReader.read(null, jobDecoder); + + return new YarnClusterSubmissionFromCS(yarnClusterAppSubmissionParameters, yarnClusterJobSubmissionParameters); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index 9250462..d43ee8d 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.reef.runtime.common.files.ClasspathProvider; @@ -68,7 +69,7 @@ public final class YarnJobSubmissionClient { private final ClasspathProvider classpath; private final SecurityTokenProvider tokenProvider; private final List<String> commandPrefixList; - private final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator; + private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator; @Inject YarnJobSubmissionClient(final JobUploader uploader, @@ -78,7 +79,7 @@ public final class YarnJobSubmissionClient { @Parameter(DriverLaunchCommandPrefix.class) final List<String> commandPrefixList, final SecurityTokenProvider tokenProvider, - final YarnJobSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { + final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { this.uploader = uploader; this.fileNames = fileNames; this.yarnConfiguration = yarnConfiguration; @@ -121,13 +122,24 @@ public final class YarnJobSubmissionClient { // ------------------------------------------------------------------------ // Upload the JAR LOG.info("Uploading job submission JAR"); - final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile); + final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE); LOG.info("Uploaded job submission JAR"); // ------------------------------------------------------------------------ + // Upload the job file + final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource( + new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()), + LocalResourceType.FILE); + + final List<String> confFiles = new ArrayList<>(); + confFiles.add(fileNames.getYarnBootstrapJobParamFilePath()); + confFiles.add(fileNames.getYarnBootstrapAppParamFilePath()); + + // ------------------------------------------------------------------------ // Submit submissionHelper .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS) + .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS) .setApplicationName(yarnSubmission.getJobId()) .setDriverMemory(yarnSubmission.getDriverMemory()) .setPriority(yarnSubmission.getPriority()) @@ -135,7 +147,7 @@ public final class YarnJobSubmissionClient { .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions()) .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0) .setLauncherClass(YarnBootstrapREEFLauncher.class) - .setConfigurationFileName(fileNames.getYarnBootstrapParamFilePath()) + .setConfigurationFilePaths(confFiles) .submit(); writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); @@ -221,16 +233,23 @@ public final class YarnJobSubmissionClient { /** * .NET client calls into this main method for job submission. * For arguments detail: - * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File) + * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File) */ public static void main(final String[] args) throws InjectionException, IOException, YarnException { final File jobSubmissionParametersFile = new File(args[0]); + final File appSubmissionParametersFile = new File(args[1]); + + if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) { + throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath()); + } + if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); } final YarnClusterSubmissionFromCS yarnSubmission = - YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile); + YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile( + appSubmissionParametersFile, jobSubmissionParametersFile); LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission); if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) { http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java deleted file mode 100644 index ccb5e8b..0000000 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.client; - -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonEncoder; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; -import org.apache.reef.runtime.common.files.REEFFileNames; -import org.apache.reef.runtime.yarn.client.uploader.JobFolder; - -import javax.inject.Inject; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.logging.Logger; - -/** - * Does client side manipulation of driver configuration for YARN runtime. - */ -final class YarnJobSubmissionParametersFileGenerator { - private static final Logger LOG = Logger.getLogger(YarnJobSubmissionParametersFileGenerator.class.getName()); - private final REEFFileNames fileNames; - - @Inject - private YarnJobSubmissionParametersFileGenerator(final REEFFileNames fileNames) { - this.fileNames = fileNames; - } - - /** - * Writes driver configuration to disk. - * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the - * YARN job for submission from the cluster. - * @throws IOException - */ - public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS, - final JobFolder jobFolderOnDFS) throws IOException { - final File yarnParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(), - fileNames.getYarnBootstrapParamFilePath()); - - try (final FileOutputStream fileOutputStream = new FileOutputStream(yarnParametersFile)) { - // this is mainly a test hook. - writeAvroYarnJobSubmissionParametersToOutputStream( - yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), fileOutputStream); - } - } - - static void writeAvroYarnJobSubmissionParametersToOutputStream( - final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS, - final String jobFolderOnDFSPath, - final OutputStream outputStream) throws IOException { - final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter = - new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class); - - final AvroYarnJobSubmissionParameters jobSubmissionParameters = - yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(); - jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath); - final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(), - outputStream); - datumWriter.write(jobSubmissionParameters, encoder); - encoder.flush(); - outputStream.flush(); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java new file mode 100644 index 0000000..506ced6 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.client.uploader.JobFolder; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.logging.Logger; + +/** + * Does client side manipulation of driver configuration for YARN runtime. + */ +final class YarnSubmissionParametersFileGenerator { + private static final Logger LOG = Logger.getLogger(YarnSubmissionParametersFileGenerator.class.getName()); + private final REEFFileNames fileNames; + + @Inject + private YarnSubmissionParametersFileGenerator(final REEFFileNames fileNames) { + this.fileNames = fileNames; + } + + /** + * Writes driver configuration to disk. + * @param yarnClusterSubmissionFromCS the information needed to submit encode YARN parameters and create the + * YARN job for submission from the cluster. + * @throws IOException + */ + public void writeConfiguration(final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS, + final JobFolder jobFolderOnDFS) throws IOException { + final File yarnAppParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(), + fileNames.getYarnBootstrapAppParamFilePath()); + + final File yarnJobParametersFile = new File(yarnClusterSubmissionFromCS.getDriverFolder(), + fileNames.getYarnBootstrapJobParamFilePath()); + + try (final FileOutputStream appFileOutputStream = new FileOutputStream(yarnAppParametersFile)) { + try (final FileOutputStream jobFileOutputStream = new FileOutputStream(yarnJobParametersFile)) { + // this is mainly a test hook. + writeAvroYarnAppSubmissionParametersToOutputStream(yarnClusterSubmissionFromCS, appFileOutputStream); + writeAvroYarnJobSubmissionParametersToOutputStream( + yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), jobFileOutputStream); + } + } + } + + static void writeAvroYarnAppSubmissionParametersToOutputStream( + final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS, + final OutputStream outputStream) throws IOException { + final DatumWriter<AvroYarnAppSubmissionParameters> datumWriter = + new SpecificDatumWriter<>(AvroYarnAppSubmissionParameters.class); + + final AvroYarnAppSubmissionParameters appSubmissionParameters = + yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters(); + final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(appSubmissionParameters.getSchema(), outputStream); + datumWriter.write(appSubmissionParameters, encoder); + encoder.flush(); + outputStream.flush(); + } + + static void writeAvroYarnJobSubmissionParametersToOutputStream( + final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS, + final String jobFolderOnDFSPath, + final OutputStream outputStream) throws IOException { + final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter = + new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class); + + final AvroYarnJobSubmissionParameters jobSubmissionParameters = + yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(); + jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath); + final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(), + outputStream); + datumWriter.write(jobSubmissionParameters, encoder); + encoder.flush(); + outputStream.flush(); + } +}
