Repository: reef
Updated Branches:
  refs/heads/master 6542e1cfb -> bfe2354be


[REEF-1212] Make Org.Apache.REEF.Client.Yarn.IJobResourceUploader interface 
async
This addressed the issue by

  * Making the interface async
  * Make IJavaClientLauncher async and use this in legacy uploader
  * Fix usages and tests

JIRA:
 [REEF-1212](https://issues.apache.org/jira/browse/REEF-1212)

Pull Request:
  This closes #870


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bfe2354b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bfe2354b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bfe2354b

Branch: refs/heads/master
Commit: bfe2354be6643e22b3d98658d9abe1545a18506e
Parents: 6542e1c
Author: Anupam <[email protected]>
Authored: Wed Mar 2 13:35:46 2016 -0800
Committer: Markus Weimer <[email protected]>
Committed: Thu Mar 3 17:23:31 2016 -0800

----------------------------------------------------------------------
 .../JobResourceUploaderTests.cs                 | 17 ++++-----
 .../LegacyJobResourceUploaderTests.cs           | 37 ++++++++++----------
 .../Common/IJavaClientLauncher.cs               |  3 +-
 .../Common/JavaClientLauncher.cs                | 15 +++++---
 .../Org.Apache.REEF.Client/Local/LocalClient.cs |  6 ++--
 .../YARN/IJobResourceUploader.cs                |  5 +--
 .../YARN/LegacyJobResourceUploader.cs           | 15 ++++----
 .../RESTClient/FileSystemJobResourceUploader.cs | 14 ++++----
 .../YARN/YARNREEFClient.cs                      |  4 ++-
 .../YARN/YarnREEFDotNetClient.cs                | 10 ++++--
 10 files changed, 74 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs 
b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
index cb163f7..ff5bad7 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading.Tasks;
 using NSubstitute;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn;
@@ -52,25 +53,25 @@ namespace Org.Apache.REEF.Client.Tests
         }
 
         [Fact]
-        public void UploadJobResourceCreatesResourceArchive()
+        public async Task UploadJobResourceCreatesResourceArchive()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
 
-            
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
 
             // Archive file generator recieved exactly one call with correct 
driver local folder path
             
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath);
         }
 
         [Fact]
-        public void UploadJobResourceReturnsJobResourceDetails()
+        public async Task UploadJobResourceReturnsJobResourceDetails()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
 
-            var archiveJobResource = 
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
-            var fileJobResource = 
jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
+            var archiveJobResource = await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
+            var fileJobResource = await 
jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
             var jobResources = new List<JobResource> { archiveJobResource, 
fileJobResource };
 
             foreach (var resource in jobResources)
@@ -85,13 +86,13 @@ namespace Org.Apache.REEF.Client.Tests
         }
 
         [Fact]
-        public void UploadJobResourceMakesCorrectFileSystemCalls()
+        public async Task UploadJobResourceMakesCorrectFileSystemCalls()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
 
-            
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
-            jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
 
             
testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath);
 

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs 
b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
index 7cf9c1f..b7977b1 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.IO;
+using System.Threading.Tasks;
 using NSubstitute;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn;
@@ -37,19 +38,19 @@ namespace Org.Apache.REEF.Client.Tests
         private const long AnyResourceSize = 53092;
 
         [Fact]
-        public void UploadJobResourceCreatesResourceArchive()
+        public async Task UploadJobResourceCreatesResourceArchive()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
             
-            
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
 
             // Archive file generator recieved exactly one call with correct 
driver local folder path with trailing \
             
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath
 + @"\");
         }
 
         [Fact]
-        public void UploadJobResourceJavaLauncherCalledWithCorrectArguments()
+        public async Task 
UploadJobResourceJavaLauncherCalledWithCorrectArguments()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
@@ -57,55 +58,55 @@ namespace Org.Apache.REEF.Client.Tests
             var anyLocalJobFilePath = AnyDriverLocalFolderPath.TrimEnd('\\') + 
@"\job-submission-params.json";
             
testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath
 + @"\")
                 .Returns(anyLocalArchivePath);
-            
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
-            jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath);
+            await 
jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath);
 
             const string javaClassNameForResourceUploader = 
@"org.apache.reef.bridge.client.JobResourceUploader";
-            Guid notUsed;
+            Guid notUsedGuid;
 
             // Clientlauncher is called with correct class name, local archive 
path, upload path and temp file.
-            testContext.JavaClientLauncher.Received(1)
-                .Launch(javaClassNameForResourceUploader,
+            var notUsedTask = testContext.JavaClientLauncher.Received(1)
+                .LaunchAsync(javaClassNameForResourceUploader,
                     anyLocalArchivePath,
                     "ARCHIVE",
                     AnyDriverResourceUploadPath + "/",
                     Arg.Is<string>(
                         outputFilePath =>
                             Path.GetDirectoryName(outputFilePath) + @"\" == 
Path.GetTempPath() 
-                            && Guid.TryParse(Path.GetFileName(outputFilePath), 
out notUsed)));
+                            && Guid.TryParse(Path.GetFileName(outputFilePath), 
out notUsedGuid)));
 
             // Clientlauncher is called with correct class name, local job 
file path, upload path and temp file.
-            testContext.JavaClientLauncher.Received(1)
-                .Launch(javaClassNameForResourceUploader,
+            notUsedTask = testContext.JavaClientLauncher.Received(1)
+                .LaunchAsync(javaClassNameForResourceUploader,
                     anyLocalJobFilePath,
                     "FILE",
                     AnyDriverResourceUploadPath + "/",
                     Arg.Is<string>(
                         outputFilePath =>
                             Path.GetDirectoryName(outputFilePath) + @"\" == 
Path.GetTempPath()
-                            && Guid.TryParse(Path.GetFileName(outputFilePath), 
out notUsed)));
+                            && Guid.TryParse(Path.GetFileName(outputFilePath), 
out notUsedGuid)));
         }
 
         [Fact]
-        public void UploadJobResourceNoFileCreatedByJavaCallThrowsException()
+        public async Task 
UploadJobResourceNoFileCreatedByJavaCallThrowsException()
         {
             var testContext = new TestContext();
             var jobResourceUploader = 
testContext.GetJobResourceUploader(fileExistsReturnValue: false);
 
             // throws filenotfound exception
-            Assert.Throws<FileNotFoundException>(() => 
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath));
+            await Assert.ThrowsAsync<FileNotFoundException>(() => 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath));
         }
 
         [Fact]
-        public void UploadJobResourceReturnsJobResourceDetails()
+        public async Task UploadJobResourceReturnsJobResourceDetails()
         {
             var testContext = new TestContext();
             var jobResourceUploader = testContext.GetJobResourceUploader();
 
-            var jobResources = new List<JobResource>()
+            var jobResources = new List<JobResource>
             {
-                
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath),
-                jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath)
+                await 
jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, 
AnyDriverResourceUploadPath),
+                await 
jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, 
AnyDriverResourceUploadPath)
             };
 
             Assert.Equal(jobResources.Count, 2);

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs 
b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs
index 293813a..0869c15 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Client.Common
@@ -26,7 +27,7 @@ namespace Org.Apache.REEF.Client.Common
     [DefaultImplementation(typeof(JavaClientLauncher))]
     internal interface IJavaClientLauncher
     {
-        void Launch(string javaClassName, params string[] parameters);
+        Task LaunchAsync(string javaClassName, params string[] parameters);
 
         /// <summary>
         /// Add entries to the end of the classpath of the java client.

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs 
b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
index de5285b..8110f8e 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
@@ -21,6 +21,7 @@ using System.Diagnostics;
 using System.Globalization;
 using System.IO;
 using System.Linq;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Client.API.Exceptions;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -51,7 +52,7 @@ namespace Org.Apache.REEF.Client.Common
         /// </summary>
         /// <param name="javaClassName"></param>
         /// <param name="parameters"></param>
-        public void Launch(string javaClassName, params string[] parameters)
+        public Task LaunchAsync(string javaClassName, params string[] 
parameters)
         {
             var startInfo = new ProcessStartInfo
             {
@@ -59,7 +60,7 @@ namespace Org.Apache.REEF.Client.Common
                 FileName = GetJavaCommand(),
                 UseShellExecute = false,
                 RedirectStandardOutput = true,
-                RedirectStandardError = true
+                RedirectStandardError = true,
             };
 
             var msg = string.Format(CultureInfo.CurrentCulture, "Launch Java 
with command: {0} {1}",
@@ -67,8 +68,10 @@ namespace Org.Apache.REEF.Client.Common
             Logger.Log(Level.Info, msg);
 
             var process = Process.Start(startInfo);
+            var processExitTracker = new TaskCompletionSource<bool>();
             if (process != null)
             {
+                process.EnableRaisingEvents = true;
                 process.OutputDataReceived += delegate(object sender, 
DataReceivedEventArgs e)
                 {
                     if (!string.IsNullOrWhiteSpace(e.Data))
@@ -85,16 +88,18 @@ namespace Org.Apache.REEF.Client.Common
                 };
                 process.BeginErrorReadLine();
                 process.BeginOutputReadLine();
-                process.WaitForExit();
+                process.Exited += (sender, args) => { 
processExitTracker.SetResult(process.ExitCode == 0); };
             }
             else
             {
-                Exceptions.Throw(new Exception("Java client process didn't 
start."), Logger);
+                processExitTracker.SetException(new Exception("Java client 
process didn't start."));
             }
+
+            return processExitTracker.Task;
         }
 
         /// <summary>
-        /// Assembles the command line arguments. Used by Launch()
+        /// Assembles the command line arguments. Used by LaunchAsync()
         /// </summary>
         /// <param name="javaClassName"></param>
         /// <param name="parameters"></param>

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs 
b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
index c6b480b..782b664 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
@@ -157,7 +157,8 @@ namespace Org.Apache.REEF.Client.Local
             var driverFolder = PrepareDriverFolder(jobRequest);
             var submissionJobArgsFilePath = 
CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder);
             var submissionAppArgsFilePath = 
CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder);
-            _javaClientLauncher.Launch(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath);
+            _javaClientLauncher.LaunchAsync(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath)
+                .GetAwaiter().GetResult();
             Logger.Log(Level.Info, "Submitted the Driver for execution.");
         }
 
@@ -167,7 +168,8 @@ namespace Org.Apache.REEF.Client.Local
             var submissionJobArgsFilePath = 
CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder);
             var submissionAppArgsFilePath = 
CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder);
 
-            Task.Run(() => _javaClientLauncher.Launch(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath));
+            _javaClientLauncher.LaunchAsync(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath)
+                .GetAwaiter().GetResult();
 
             var fileName = Path.Combine(driverFolder, 
_fileNames.DriverHttpEndpoint);
             JobSubmissionResult result = new LocalJobSubmissionResult(this, 
fileName);

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
index 8123f2c..93d447c 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Client.Yarn
@@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Client.Yarn
         /// <param name="driverLocalFolderPath">Local folder where REEF 
application resources are staged</param>
         /// <param name="remoteUploadDirectoryPath">Remote directory path 
where we will upload resources</param>
         /// <returns>Path, modification time and size of uploaded file as 
JobResource</returns>
-        JobResource UploadArchiveResource(string driverLocalFolderPath, string 
remoteUploadDirectoryPath);
+        Task<JobResource> UploadArchiveResourceAsync(string 
driverLocalFolderPath, string remoteUploadDirectoryPath);
 
         /// <summary>
         /// Locates a file resource and uploads it to DFS destination path.
@@ -36,6 +37,6 @@ namespace Org.Apache.REEF.Client.Yarn
         /// <param name="fileLocalPath">file path</param>
         /// <param name="remoteUploadDirectoryPath">Remote directory path 
where we will upload resources</param>
         /// <returns>Path, modification time and size of uploaded file as 
JobResource</returns>
-        JobResource UploadFileResource(string fileLocalPath, string 
remoteUploadDirectoryPath);
+        Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string 
remoteUploadDirectoryPath);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
index 94fde87..9e7d1f0 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
@@ -16,8 +16,8 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.IO;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
 using Org.Apache.REEF.Common.Files;
@@ -62,24 +62,24 @@ namespace Org.Apache.REEF.Client.Yarn
             _reefFileNames = reefFileNames;
         }
 
-        public JobResource UploadArchiveResource(string driverLocalFolderPath, 
string remoteUploadDirectoryPath)
+        public async Task<JobResource> UploadArchiveResourceAsync(string 
driverLocalFolderPath, string remoteUploadDirectoryPath)
         {
             driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
             var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
             Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", 
driverLocalFolderPath, driverUploadPath);
 
             var archivePath = 
_resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
-            return GetJobResource(archivePath, ResourceType.ARCHIVE, 
driverUploadPath, _reefFileNames.GetReefFolderName());
+            return await UploadResourceAndGetInfoAsync(archivePath, 
ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName());
         }
 
-        public JobResource UploadFileResource(string fileLocalPath, string 
remoteUploadDirectoryPath)
+        public async Task<JobResource> UploadFileResourceAsync(string 
fileLocalPath, string remoteUploadDirectoryPath)
         {
             var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
             var jobArgsFilePath = fileLocalPath;
-            return GetJobResource(jobArgsFilePath, ResourceType.FILE, 
driverUploadPath);
+            return await UploadResourceAndGetInfoAsync(jobArgsFilePath, 
ResourceType.FILE, driverUploadPath);
         }
 
-        private JobResource GetJobResource(string filePath, ResourceType 
resourceType, string driverUploadPath, string localizedName = null)
+        private async Task<JobResource> UploadResourceAndGetInfoAsync(string 
filePath, ResourceType resourceType, string driverUploadPath, string 
localizedName = null)
         {
             if (!_file.Exists(filePath))
             {
@@ -92,7 +92,7 @@ namespace Org.Apache.REEF.Client.Yarn
 
             try
             {
-                _javaLauncher.Launch(JavaClassNameForResourceUploader,
+                await 
_javaLauncher.LaunchAsync(JavaClassNameForResourceUploader,
                     filePath,
                     resourceType.ToString(),
                     driverUploadPath,
@@ -119,6 +119,7 @@ namespace Org.Apache.REEF.Client.Yarn
                     Log);
             }
 
+            // Single line file, easier to deal with sync read
             string fileContent = 
_file.ReadAllText(resourceDetailsOutputPath).Trim();
 
             Log.Log(Level.Info, "Java uploader returned content: " + 
fileContent);

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
 
b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
index 8414475..9647994 100644
--- 
a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
+++ 
b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.IO;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
@@ -58,7 +59,7 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
             _file = file;
         }
 
-        public JobResource UploadArchiveResource(string driverLocalFolderPath, 
string remoteUploadDirectoryPath)
+        public async Task<JobResource> UploadArchiveResourceAsync(string 
driverLocalFolderPath, string remoteUploadDirectoryPath)
         {
             driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
             var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
@@ -68,19 +69,19 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
             _fileSystem.CreateDirectory(parentDirectoryUri);
 
             var archivePath = 
_resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
-            return GetJobResource(archivePath, ResourceType.ARCHIVE, 
driverUploadPath, _reefFileNames.GetReefFolderName());
+            return await UploadResourceAndGetInfoAsync(archivePath, 
ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName());
         }
 
-        public JobResource UploadFileResource(string fileLocalPath, string 
remoteUploadDirectoryPath)
+        public async Task<JobResource> UploadFileResourceAsync(string 
fileLocalPath, string remoteUploadDirectoryPath)
         {
             var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
             var parentDirectoryUri = 
_fileSystem.CreateUriForPath(driverUploadPath);
 
             _fileSystem.CreateDirectory(parentDirectoryUri);
-            return GetJobResource(fileLocalPath, ResourceType.FILE, 
remoteUploadDirectoryPath);
+            return await UploadResourceAndGetInfoAsync(fileLocalPath, 
ResourceType.FILE, remoteUploadDirectoryPath);
         }
 
-        private JobResource GetJobResource(string filePath, ResourceType 
resourceType, string driverUploadPath, string localizedName = null)
+        private async Task<JobResource> UploadResourceAndGetInfoAsync(string 
filePath, ResourceType resourceType, string driverUploadPath, string 
localizedName = null)
         {
             if (!_file.Exists(filePath))
             {
@@ -94,7 +95,8 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
 
             Log.Log(Level.Verbose, @"Copy {0} to {1}", filePath, 
remoteFileUri);
 
-            _fileSystem.CopyFromLocal(filePath, remoteFileUri);
+            // IFileSystem does not support async APIs; Hence we start the 
copying in a task
+            await Task.Run(() => _fileSystem.CopyFromLocal(filePath, 
remoteFileUri));
             var fileStatus = _fileSystem.GetFileStatus(remoteFileUri);
 
             return new JobResource

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
index cbf7eb2..7380b7a 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
@@ -125,7 +125,9 @@ namespace Org.Apache.REEF.Client.Yarn
             var submissionAppArgsFilePath = 
_paramSerializer.SerializeAppFile(jobRequest.AppParameters, paramInjector, 
driverFolderPath);
 
             // Submit the driver
-            _javaClientLauncher.Launch(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath);
+            _javaClientLauncher.LaunchAsync(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath)
+                .GetAwaiter()
+                .GetResult();
             Logger.Log(Level.Info, "Submitted the Driver for execution." + 
jobRequest.JobIdentifier);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/bfe2354b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
index ae109c1..dbbd840 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
@@ -97,12 +97,18 @@ namespace Org.Apache.REEF.Client.YARN
                 _paramSerializer.SerializeAppFile(jobRequest.AppParameters, 
paramInjector, localDriverFolderPath);
                 _paramSerializer.SerializeJobFile(jobRequest.JobParameters, 
localDriverFolderPath, jobSubmissionDirectory);
 
-                var archiveResource = 
_jobResourceUploader.UploadArchiveResource(localDriverFolderPath, 
jobSubmissionDirectory);
+                var archiveResource =
+                    
_jobResourceUploader.UploadArchiveResourceAsync(localDriverFolderPath, 
jobSubmissionDirectory)
+                        .GetAwaiter()
+                        .GetResult();
 
                 // Path to the job args file.
                 var jobArgsFilePath = Path.Combine(localDriverFolderPath, 
_fileNames.GetJobSubmissionParametersFile());
 
-                var argFileResource = 
_jobResourceUploader.UploadFileResource(jobArgsFilePath, 
jobSubmissionDirectory);
+                var argFileResource =
+                    
_jobResourceUploader.UploadFileResourceAsync(jobArgsFilePath, 
jobSubmissionDirectory)
+                        .GetAwaiter()
+                        .GetResult();
 
                 // upload prepared folder to DFS
                 var jobResources = new List<JobResource> { archiveResource, 
argFileResource };

Reply via email to