Repository: reef Updated Branches: refs/heads/master 6542e1cfb -> bfe2354be
[REEF-1212] Make Org.Apache.REEF.Client.Yarn.IJobResourceUploader interface async This addressed the issue by * Making the interface async * Make IJavaClientLauncher async and use this in legacy uploader * Fix usages and tests JIRA: [REEF-1212](https://issues.apache.org/jira/browse/REEF-1212) Pull Request: This closes #870 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bfe2354b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bfe2354b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bfe2354b Branch: refs/heads/master Commit: bfe2354be6643e22b3d98658d9abe1545a18506e Parents: 6542e1c Author: Anupam <[email protected]> Authored: Wed Mar 2 13:35:46 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Thu Mar 3 17:23:31 2016 -0800 ---------------------------------------------------------------------- .../JobResourceUploaderTests.cs | 17 ++++----- .../LegacyJobResourceUploaderTests.cs | 37 ++++++++++---------- .../Common/IJavaClientLauncher.cs | 3 +- .../Common/JavaClientLauncher.cs | 15 +++++--- .../Org.Apache.REEF.Client/Local/LocalClient.cs | 6 ++-- .../YARN/IJobResourceUploader.cs | 5 +-- .../YARN/LegacyJobResourceUploader.cs | 15 ++++---- .../RESTClient/FileSystemJobResourceUploader.cs | 14 ++++---- .../YARN/YARNREEFClient.cs | 4 ++- .../YARN/YarnREEFDotNetClient.cs | 10 ++++-- 10 files changed, 74 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs index cb163f7..ff5bad7 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using NSubstitute; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; @@ -52,25 +53,25 @@ namespace Org.Apache.REEF.Client.Tests } [Fact] - public void UploadJobResourceCreatesResourceArchive() + public async Task UploadJobResourceCreatesResourceArchive() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); // Archive file generator recieved exactly one call with correct driver local folder path testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath); } [Fact] - public void UploadJobResourceReturnsJobResourceDetails() + public async Task UploadJobResourceReturnsJobResourceDetails() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - var archiveJobResource = jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); - var fileJobResource = jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); + var archiveJobResource = await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + var fileJobResource = await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath); var jobResources = new List<JobResource> { archiveJobResource, fileJobResource }; foreach (var resource in jobResources) @@ -85,13 +86,13 @@ namespace Org.Apache.REEF.Client.Tests } [Fact] - public void UploadJobResourceMakesCorrectFileSystemCalls() + public async Task UploadJobResourceMakesCorrectFileSystemCalls() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); - jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath); testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath); http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs index 7cf9c1f..b7977b1 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Threading.Tasks; using NSubstitute; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; @@ -37,19 +38,19 @@ namespace Org.Apache.REEF.Client.Tests private const long AnyResourceSize = 53092; [Fact] - public void UploadJobResourceCreatesResourceArchive() + public async Task UploadJobResourceCreatesResourceArchive() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); // Archive file generator recieved exactly one call with correct driver local folder path with trailing \ testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\"); } [Fact] - public void UploadJobResourceJavaLauncherCalledWithCorrectArguments() + public async Task UploadJobResourceJavaLauncherCalledWithCorrectArguments() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); @@ -57,55 +58,55 @@ namespace Org.Apache.REEF.Client.Tests var anyLocalJobFilePath = AnyDriverLocalFolderPath.TrimEnd('\\') + @"\job-submission-params.json"; testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\") .Returns(anyLocalArchivePath); - jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); - jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath); const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader"; - Guid notUsed; + Guid notUsedGuid; // Clientlauncher is called with correct class name, local archive path, upload path and temp file. - testContext.JavaClientLauncher.Received(1) - .Launch(javaClassNameForResourceUploader, + var notUsedTask = testContext.JavaClientLauncher.Received(1) + .LaunchAsync(javaClassNameForResourceUploader, anyLocalArchivePath, "ARCHIVE", AnyDriverResourceUploadPath + "/", Arg.Is<string>( outputFilePath => Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath() - && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed))); + && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsedGuid))); // Clientlauncher is called with correct class name, local job file path, upload path and temp file. - testContext.JavaClientLauncher.Received(1) - .Launch(javaClassNameForResourceUploader, + notUsedTask = testContext.JavaClientLauncher.Received(1) + .LaunchAsync(javaClassNameForResourceUploader, anyLocalJobFilePath, "FILE", AnyDriverResourceUploadPath + "/", Arg.Is<string>( outputFilePath => Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath() - && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed))); + && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsedGuid))); } [Fact] - public void UploadJobResourceNoFileCreatedByJavaCallThrowsException() + public async Task UploadJobResourceNoFileCreatedByJavaCallThrowsException() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(fileExistsReturnValue: false); // throws filenotfound exception - Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath)); + await Assert.ThrowsAsync<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath)); } [Fact] - public void UploadJobResourceReturnsJobResourceDetails() + public async Task UploadJobResourceReturnsJobResourceDetails() { var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - var jobResources = new List<JobResource>() + var jobResources = new List<JobResource> { - jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath), - jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath) + await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath), + await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath) }; Assert.Equal(jobResources.Count, 2); http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs index 293813a..0869c15 100644 --- a/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs +++ b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs @@ -16,6 +16,7 @@ // under the License. using System.Collections.Generic; +using System.Threading.Tasks; using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Client.Common @@ -26,7 +27,7 @@ namespace Org.Apache.REEF.Client.Common [DefaultImplementation(typeof(JavaClientLauncher))] internal interface IJavaClientLauncher { - void Launch(string javaClassName, params string[] parameters); + Task LaunchAsync(string javaClassName, params string[] parameters); /// <summary> /// Add entries to the end of the classpath of the java client. http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs index de5285b..8110f8e 100644 --- a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs +++ b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs @@ -21,6 +21,7 @@ using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; +using System.Threading.Tasks; using Org.Apache.REEF.Client.API.Exceptions; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Diagnostics; @@ -51,7 +52,7 @@ namespace Org.Apache.REEF.Client.Common /// </summary> /// <param name="javaClassName"></param> /// <param name="parameters"></param> - public void Launch(string javaClassName, params string[] parameters) + public Task LaunchAsync(string javaClassName, params string[] parameters) { var startInfo = new ProcessStartInfo { @@ -59,7 +60,7 @@ namespace Org.Apache.REEF.Client.Common FileName = GetJavaCommand(), UseShellExecute = false, RedirectStandardOutput = true, - RedirectStandardError = true + RedirectStandardError = true, }; var msg = string.Format(CultureInfo.CurrentCulture, "Launch Java with command: {0} {1}", @@ -67,8 +68,10 @@ namespace Org.Apache.REEF.Client.Common Logger.Log(Level.Info, msg); var process = Process.Start(startInfo); + var processExitTracker = new TaskCompletionSource<bool>(); if (process != null) { + process.EnableRaisingEvents = true; process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e) { if (!string.IsNullOrWhiteSpace(e.Data)) @@ -85,16 +88,18 @@ namespace Org.Apache.REEF.Client.Common }; process.BeginErrorReadLine(); process.BeginOutputReadLine(); - process.WaitForExit(); + process.Exited += (sender, args) => { processExitTracker.SetResult(process.ExitCode == 0); }; } else { - Exceptions.Throw(new Exception("Java client process didn't start."), Logger); + processExitTracker.SetException(new Exception("Java client process didn't start.")); } + + return processExitTracker.Task; } /// <summary> - /// Assembles the command line arguments. Used by Launch() + /// Assembles the command line arguments. Used by LaunchAsync() /// </summary> /// <param name="javaClassName"></param> /// <param name="parameters"></param> http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs index c6b480b..782b664 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs @@ -157,7 +157,8 @@ namespace Org.Apache.REEF.Client.Local var driverFolder = PrepareDriverFolder(jobRequest); var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder); var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder); - _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath); + _javaClientLauncher.LaunchAsync(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath) + .GetAwaiter().GetResult(); Logger.Log(Level.Info, "Submitted the Driver for execution."); } @@ -167,7 +168,8 @@ namespace Org.Apache.REEF.Client.Local var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder); var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder); - Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath)); + _javaClientLauncher.LaunchAsync(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath) + .GetAwaiter().GetResult(); var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint); JobSubmissionResult result = new LocalJobSubmissionResult(this, fileName); http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs index 8123f2c..93d447c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System.Threading.Tasks; using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Client.Yarn @@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Client.Yarn /// <param name="driverLocalFolderPath">Local folder where REEF application resources are staged</param> /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param> /// <returns>Path, modification time and size of uploaded file as JobResource</returns> - JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath); + Task<JobResource> UploadArchiveResourceAsync(string driverLocalFolderPath, string remoteUploadDirectoryPath); /// <summary> /// Locates a file resource and uploads it to DFS destination path. @@ -36,6 +37,6 @@ namespace Org.Apache.REEF.Client.Yarn /// <param name="fileLocalPath">file path</param> /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param> /// <returns>Path, modification time and size of uploaded file as JobResource</returns> - JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath); + Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string remoteUploadDirectoryPath); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs index 94fde87..9e7d1f0 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs @@ -16,8 +16,8 @@ // under the License. using System; -using System.Collections.Generic; using System.IO; +using System.Threading.Tasks; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.YARN.RestClient.DataModel; using Org.Apache.REEF.Common.Files; @@ -62,24 +62,24 @@ namespace Org.Apache.REEF.Client.Yarn _reefFileNames = reefFileNames; } - public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath) + public async Task<JobResource> UploadArchiveResourceAsync(string driverLocalFolderPath, string remoteUploadDirectoryPath) { driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath); var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); - return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName()); + return await UploadResourceAndGetInfoAsync(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName()); } - public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath) + public async Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string remoteUploadDirectoryPath) { var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; var jobArgsFilePath = fileLocalPath; - return GetJobResource(jobArgsFilePath, ResourceType.FILE, driverUploadPath); + return await UploadResourceAndGetInfoAsync(jobArgsFilePath, ResourceType.FILE, driverUploadPath); } - private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null) + private async Task<JobResource> UploadResourceAndGetInfoAsync(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null) { if (!_file.Exists(filePath)) { @@ -92,7 +92,7 @@ namespace Org.Apache.REEF.Client.Yarn try { - _javaLauncher.Launch(JavaClassNameForResourceUploader, + await _javaLauncher.LaunchAsync(JavaClassNameForResourceUploader, filePath, resourceType.ToString(), driverUploadPath, @@ -119,6 +119,7 @@ namespace Org.Apache.REEF.Client.Yarn Log); } + // Single line file, easier to deal with sync read string fileContent = _file.ReadAllText(resourceDetailsOutputPath).Trim(); Log.Log(Level.Info, "Java uploader returned content: " + fileContent); http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs index 8414475..9647994 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs @@ -17,6 +17,7 @@ using System; using System.IO; +using System.Threading.Tasks; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; using Org.Apache.REEF.Client.YARN.RestClient.DataModel; @@ -58,7 +59,7 @@ namespace Org.Apache.REEF.Client.YARN.RestClient _file = file; } - public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath) + public async Task<JobResource> UploadArchiveResourceAsync(string driverLocalFolderPath, string remoteUploadDirectoryPath) { driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; @@ -68,19 +69,19 @@ namespace Org.Apache.REEF.Client.YARN.RestClient _fileSystem.CreateDirectory(parentDirectoryUri); var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); - return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName()); + return await UploadResourceAndGetInfoAsync(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName()); } - public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath) + public async Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string remoteUploadDirectoryPath) { var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath); _fileSystem.CreateDirectory(parentDirectoryUri); - return GetJobResource(fileLocalPath, ResourceType.FILE, remoteUploadDirectoryPath); + return await UploadResourceAndGetInfoAsync(fileLocalPath, ResourceType.FILE, remoteUploadDirectoryPath); } - private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null) + private async Task<JobResource> UploadResourceAndGetInfoAsync(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null) { if (!_file.Exists(filePath)) { @@ -94,7 +95,8 @@ namespace Org.Apache.REEF.Client.YARN.RestClient Log.Log(Level.Verbose, @"Copy {0} to {1}", filePath, remoteFileUri); - _fileSystem.CopyFromLocal(filePath, remoteFileUri); + // IFileSystem does not support async APIs; Hence we start the copying in a task + await Task.Run(() => _fileSystem.CopyFromLocal(filePath, remoteFileUri)); var fileStatus = _fileSystem.GetFileStatus(remoteFileUri); return new JobResource http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs index cbf7eb2..7380b7a 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs @@ -125,7 +125,9 @@ namespace Org.Apache.REEF.Client.Yarn var submissionAppArgsFilePath = _paramSerializer.SerializeAppFile(jobRequest.AppParameters, paramInjector, driverFolderPath); // Submit the driver - _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath); + _javaClientLauncher.LaunchAsync(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath) + .GetAwaiter() + .GetResult(); Logger.Log(Level.Info, "Submitted the Driver for execution." + jobRequest.JobIdentifier); } http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs index ae109c1..dbbd840 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs @@ -97,12 +97,18 @@ namespace Org.Apache.REEF.Client.YARN _paramSerializer.SerializeAppFile(jobRequest.AppParameters, paramInjector, localDriverFolderPath); _paramSerializer.SerializeJobFile(jobRequest.JobParameters, localDriverFolderPath, jobSubmissionDirectory); - var archiveResource = _jobResourceUploader.UploadArchiveResource(localDriverFolderPath, jobSubmissionDirectory); + var archiveResource = + _jobResourceUploader.UploadArchiveResourceAsync(localDriverFolderPath, jobSubmissionDirectory) + .GetAwaiter() + .GetResult(); // Path to the job args file. var jobArgsFilePath = Path.Combine(localDriverFolderPath, _fileNames.GetJobSubmissionParametersFile()); - var argFileResource = _jobResourceUploader.UploadFileResource(jobArgsFilePath, jobSubmissionDirectory); + var argFileResource = + _jobResourceUploader.UploadFileResourceAsync(jobArgsFilePath, jobSubmissionDirectory) + .GetAwaiter() + .GetResult(); // upload prepared folder to DFS var jobResources = new List<JobResource> { archiveResource, argFileResource };
