Repository: incubator-reef Updated Branches: refs/heads/master d169b2591 -> b99b96a00
[REEF-940] Create .NET job resource uploader using IFileSystem JIRA: [REEF-940](https://issues.apache.org/jira/browse/REEF-940) Pull Request: Closes #637 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b99b96a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b99b96a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b99b96a0 Branch: refs/heads/master Commit: b99b96a00f4a02918876fb7fcf49cff52346a1c4 Parents: d169b25 Author: Anupam <anupam...@gmail.com> Authored: Fri Nov 13 11:57:22 2015 -0800 Committer: Andrew Chung <afchun...@gmail.com> Committed: Wed Nov 18 17:18:30 2015 -0800 ---------------------------------------------------------------------- .../JobResourceUploaderTests.cs | 130 +++++++++++++++++++ .../LegacyJobResourceUploaderTests.cs | 2 +- .../Org.Apache.REEF.Client.Tests.csproj | 9 +- .../Org.Apache.REEF.Client.csproj | 5 + .../YARN/IJobSubmissionDirectoryProvider.cs | 4 +- .../YARN/JobSubmissionDirectoryProvider.cs | 2 +- .../YARN/LegacyJobResourceUploader.cs | 2 +- .../RESTClient/FileSystemJobResourceUploader.cs | 81 ++++++++++++ 8 files changed, 228 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs new file mode 100644 index 0000000..3479842 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using NSubstitute; +using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN.RestClient; +using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.Client.Tests +{ + [TestClass] + public class JobResourceUploaderTests + { + private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path\"; + private const string AnyDriverResourceUploadPath = "/vol1/tmp/"; + private const string AnyUploadedResourcePath = "/vol1/tmp/Path.zip"; + private const string AnyHost = "host"; + private const string AnyScheme = "hdfs://"; + private const string AnyUploadedResourceAbsoluteUri = AnyScheme + AnyHost + AnyUploadedResourcePath; + private const string AnyLocalArchivePath = @"Any\Local\Archive\Path.zip"; + private const long AnyModificationTime = 1447413621; + private const long AnyResourceSize = 53092; + private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0); + + [TestMethod] + public void JobResourceUploaderCanInstantiateWithDefaultBindings() + { + TangFactory.GetTang().NewInjector().GetInstance<FileSystemJobResourceUploader>(); + } + + [TestMethod] + public void UploadJobResourceCreatesResourceArchive() + { + var testContext = new TestContext(); + var jobResourceUploader = testContext.GetJobResourceUploader(); + + jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath); + + // Archive file generator recieved exactly one call with correct driver local folder path + testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath); + } + + [TestMethod] + public void UploadJobResourceReturnsJobResourceDetails() + { + var testContext = new TestContext(); + var jobResourceUploader = testContext.GetJobResourceUploader(); + + var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath); + + Assert.AreEqual(AnyModificationTime, jobResource.LastModificationUnixTimestamp); + Assert.AreEqual(AnyResourceSize, jobResource.ResourceSize); + Assert.AreEqual(AnyUploadedResourceAbsoluteUri, jobResource.RemoteUploadPath); + } + + [TestMethod] + public void UploadJobResourceMakesCorrectFileSystemCalls() + { + var testContext = new TestContext(); + var jobResourceUploader = testContext.GetJobResourceUploader(); + + jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath); + + testContext.FileSystem.Received(1).CreateUriForPath(AnyDriverResourceUploadPath); + testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath); + testContext.FileSystem.Received(1) + .CopyFromLocal(AnyLocalArchivePath, new Uri(AnyUploadedResourceAbsoluteUri)); + testContext.FileSystem.Received(1) + .CreateDirectory(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath)); + } + + [TestMethod] + public void UploadJobResourceCallsJobSubmissionDirProvider() + { + var testContext = new TestContext(); + var jobResourceUploader = testContext.GetJobResourceUploader(); + + jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath); + + testContext.JobSubmissionDirectoryProvider.Received(1).GetJobSubmissionRemoteDirectory(); + } + + private class TestContext + { + + public readonly IJobSubmissionDirectoryProvider JobSubmissionDirectoryProvider = + Substitute.For<IJobSubmissionDirectoryProvider>(); + public readonly IResourceArchiveFileGenerator ResourceArchiveFileGenerator = + Substitute.For<IResourceArchiveFileGenerator>(); + public readonly IFileSystem FileSystem = Substitute.For<IFileSystem>(); + + public FileSystemJobResourceUploader GetJobResourceUploader() + { + var injector = TangFactory.GetTang().NewInjector(); + JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath); + FileSystem.GetFileStatus(new Uri(AnyUploadedResourceAbsoluteUri)) + .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize)); + ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath) + .Returns(AnyLocalArchivePath); + FileSystem.CreateUriForPath(AnyDriverResourceUploadPath) + .Returns(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath)); + FileSystem.CreateUriForPath(AnyUploadedResourcePath) + .Returns(new Uri(AnyUploadedResourceAbsoluteUri)); + injector.BindVolatileInstance(GenericType<IJobSubmissionDirectoryProvider>.Class, JobSubmissionDirectoryProvider); + injector.BindVolatileInstance(GenericType<IResourceArchiveFileGenerator>.Class, ResourceArchiveFileGenerator); + injector.BindVolatileInstance(GenericType<IFileSystem>.Class, FileSystem); + return injector.GetInstance<FileSystemJobResourceUploader>(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs index e76ae66..9b4e4f1 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs @@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Client.Tests const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip"; testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\") .Returns(anyLocalArchivePath); - testContext.JobSubmissionDirectoryProvider.GetJobSubmissionDirectory().Returns(AnyDriverResourceUploadPath); + testContext.JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath); jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath); const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader"; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj index e7af026..8094981 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj @@ -47,6 +47,7 @@ under the License. <Reference Include="System.ServiceProcess" /> </ItemGroup> <ItemGroup> + <Compile Include="JobResourceUploaderTests.cs" /> <Compile Include="LegacyJobResourceUploaderTests.cs" /> <Compile Include="MultipleRMUrlProviderTests.cs" /> <Compile Include="WindowsHadoopEmulatorYarnClientTests.cs" /> @@ -55,11 +56,15 @@ under the License. <Compile Include="YarnConfigurationUrlProviderTests.cs" /> </ItemGroup> <ItemGroup> - <ProjectReference Include="..\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj"> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj"> <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project> <Name>Org.Apache.REEF.Client</Name> </ProjectReference> - <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj"> + <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project> + <Name>Org.Apache.REEF.IO</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> <Project>{97DBB573-3994-417A-9F69-FFA25F00D2A6}</Project> <Name>Org.Apache.REEF.Tang</Name> </ProjectReference> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index e456716..910ae2d 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -113,6 +113,7 @@ under the License. <Compile Include="YARN\RestClient\DataModel\Tokens.cs" /> <Compile Include="YARN\RestClient\IRestRequestExecutor.cs" /> <Compile Include="YARN\RestClient\IUrlProvider.cs" /> + <Compile Include="YARN\RestClient\FileSystemJobResourceUploader.cs" /> <Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" /> <Compile Include="YARN\RestClient\RestJsonDeserializer.cs" /> <Compile Include="YARN\RestClient\RestJsonSerializer.cs" /> @@ -181,6 +182,10 @@ under the License. <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project> <Name>Org.Apache.REEF.Examples</Name> </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj"> + <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project> + <Name>Org.Apache.REEF.IO</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <EmbeddedResource Include="$(TempResxFile)"> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs index b702e79..3faa62d 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs @@ -26,9 +26,9 @@ namespace Org.Apache.REEF.Client.Yarn public interface IJobSubmissionDirectoryProvider { /// <summary> - /// Returns path to job submission directory. + /// Returns path to job submission directory in DFS. /// </summary> /// <returns></returns> - string GetJobSubmissionDirectory(); + string GetJobSubmissionRemoteDirectory(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs index 4a5c59a..fe1b007 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs @@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Client.Yarn _jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix; } - public string GetJobSubmissionDirectory() + public string GetJobSubmissionRemoteDirectory() { return string.Format(@"{0}/{1}{2}/", _jobSubmissionDirectoryPrefix, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs index 22e32a1..0c01709 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs @@ -59,7 +59,7 @@ namespace Org.Apache.REEF.Client.Yarn public JobResource UploadJobResource(string driverLocalFolderPath) { driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; - string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionDirectory().TrimEnd('/') + @"/"; + string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/"; Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath); var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs new file mode 100644 index 0000000..cfa83b0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Client.YARN.RestClient +{ + /// <summary> + /// Provides FileSystem agnostic job resource uploader. + /// User can provide custome implementation of + /// <see cref="IFileSystem"/> for their choice of DFS. + /// </summary> + internal sealed class FileSystemJobResourceUploader : IJobResourceUploader + { + private static readonly Logger Log = Logger.GetLogger(typeof(FileSystemJobResourceUploader)); + private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0); + private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider; + private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator; + private readonly IFileSystem _fileSystem; + + [Inject] + private FileSystemJobResourceUploader( + IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider, + IResourceArchiveFileGenerator resourceArchiveFileGenerator, + IFileSystem fileSystem) + { + _fileSystem = fileSystem; + _resourceArchiveFileGenerator = resourceArchiveFileGenerator; + _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider; + } + + public JobResource UploadJobResource(string driverLocalFolderPath) + { + driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; + var driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/"; + Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath); + var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); + + var destinationPath = driverUploadPath + Path.GetFileName(archivePath); + var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath); + Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, remoteFileUri); + + var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath); + _fileSystem.CreateDirectory(parentDirectoryUri); + _fileSystem.CopyFromLocal(archivePath, remoteFileUri); + var fileStatus = _fileSystem.GetFileStatus(remoteFileUri); + + return new JobResource + { + LastModificationUnixTimestamp = DateTimeToUnixTimestamp(fileStatus.ModificationTime), + RemoteUploadPath = remoteFileUri.AbsoluteUri, + ResourceSize = fileStatus.LengthBytes + }; + } + + private long DateTimeToUnixTimestamp(DateTime dateTime) + { + return (long) (dateTime - Epoch).TotalSeconds; + } + } +} \ No newline at end of file