[REEF-1180] Split REEFClient submission parameter file into per-application and per-job
This addressed the issue by * Add and modify avro files to reflect job and application specific parameters. * Modify command line arguments for C# submission. * Modify Java submission client and handlers to reflect change in submission arguments. JIRA: [REEF-1180](https://issues.apache.org/jira/browse/REEF-1180) Pull Request: Closes #823 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1137cdee Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1137cdee Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1137cdee Branch: refs/heads/master Commit: 1137cdee0e96e85851fc5fa8a2fdac3a87c58a4a Parents: 20369d4 Author: Andrew Chung <[email protected]> Authored: Wed Feb 3 16:19:38 2016 -0800 Committer: Andrew Chung <[email protected]> Committed: Wed Feb 10 10:47:53 2016 -0800 ---------------------------------------------------------------------- .../JobResourceUploaderTests.cs | 42 +++- .../LegacyJobResourceUploaderTests.cs | 42 +++- .../Org.Apache.REEF.Client.Tests.csproj | 9 + .../WindowsYarnJobCommandProviderTests.cs | 20 +- .../YarnREEFParamSerializerTests.cs | 196 +++++++++++++++++++ .../packages.config | 1 + .../Avro/AvroAppSubmissionParameters.cs | 82 ++++++++ .../Avro/AvroJobSubmissionParameters.cs | 28 +-- .../Local/AvroLocalAppSubmissionParameters.cs | 74 +++++++ .../Local/AvroLocalJobSubmissionParameters.cs | 74 ------- .../YARN/AvroClusterAppSubmissionParameters.cs | 73 +++++++ .../YARN/AvroYarnAppSubmissionParameters.cs | 82 ++++++++ .../AvroYarnClusterJobSubmissionParameters.cs | 12 +- .../YARN/AvroYarnJobSubmissionParameters.cs | 20 +- .../Org.Apache.REEF.Client/Local/LocalClient.cs | 42 ++-- .../Org.Apache.REEF.Client.csproj | 7 +- .../YARN/IJobResourceUploader.cs | 14 +- .../Org.Apache.REEF.Client/YARN/JobResource.cs | 10 +- .../YARN/LegacyJobResourceUploader.cs | 61 ++++-- .../YARN/RESTClient/DataModel/LocalResources.cs | 2 +- .../RESTClient/FileSystemJobResourceUploader.cs | 55 +++++- .../YARN/WindowsYarnJobCommandProvider.cs | 3 +- .../YARN/YARNREEFClient.cs | 55 +----- .../YARN/YarnREEFDotNetClient.cs | 100 ++++------ .../YARN/YarnREEFDotNetParamSerializer.cs | 114 +++++++++++ .../YARN/YarnREEFParamSerializer.cs | 137 +++++++++++++ .../Files/REEFFileNames.cs | 12 +- .../src/main/avro/AppSubmissionParameters.avsc | 62 ++++++ .../src/main/avro/JobSubmissionParameters.avsc | 18 +- .../reef/bridge/client/JobResourceUploader.java | 19 +- .../apache/reef/bridge/client/LocalClient.java | 9 +- ...ocalRuntimeDriverConfigurationGenerator.java | 11 +- .../bridge/client/LocalSubmissionFromCS.java | 50 +++-- .../YarnBootstrapDriverConfigGenerator.java | 64 ++++-- .../client/YarnBootstrapREEFLauncher.java | 19 +- .../client/YarnClusterSubmissionFromCS.java | 62 ++++-- .../bridge/client/YarnJobSubmissionClient.java | 31 ++- ...arnJobSubmissionParametersFileGenerator.java | 82 -------- .../YarnSubmissionParametersFileGenerator.java | 103 ++++++++++ ...SubmissionParametersSerializationFromCS.java | 97 ++++++--- .../reef/driver/evaluator/CLRProcess.java | 3 +- .../reef/driver/evaluator/JVMProcess.java | 3 +- .../runtime/common/files/REEFFileNames.java | 31 ++- .../common/launch/CLRLaunchCommandBuilder.java | 14 +- .../common/launch/JavaLaunchCommandBuilder.java | 13 +- .../common/launch/LaunchCommandBuilder.java | 4 +- .../launch/JavaLaunchCommandBuilderTest.java | 4 +- .../client/HDInsightJobSubmissionHandler.java | 3 +- .../client/PreparedDriverFolderLauncher.java | 3 +- .../mesos/client/MesosJobSubmissionHandler.java | 3 +- .../yarn/client/YarnJobSubmissionHandler.java | 4 +- .../yarn/client/YarnSubmissionHelper.java | 31 ++- .../runtime/yarn/client/uploader/JobFolder.java | 8 +- 53 files changed, 1573 insertions(+), 545 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs index e5a6113..5f75d59 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs @@ -16,6 +16,8 @@ // under the License. using System; +using System.Collections.Generic; +using System.Linq; using NSubstitute; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Yarn; @@ -32,10 +34,13 @@ namespace Org.Apache.REEF.Client.Tests private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path\"; private const string AnyDriverResourceUploadPath = "/vol1/tmp/"; private const string AnyUploadedResourcePath = "/vol1/tmp/Path.zip"; + private const string AnyJobFileResourcePath = "/vol1/tmp/job-submission-params.json"; private const string AnyHost = "host"; private const string AnyScheme = "hdfs://"; private const string AnyUploadedResourceAbsoluteUri = AnyScheme + AnyHost + AnyUploadedResourcePath; + private const string AnyJobFileResourceAbsoluteUri = AnyScheme + AnyHost + AnyJobFileResourcePath; private const string AnyLocalArchivePath = @"Any\Local\Archive\Path.zip"; + private const string AnyLocalJobFilePath = @"Any\Local\Folder\Path\job-submission-params.json"; private const long AnyModificationTime = 1447413621; private const long AnyResourceSize = 53092; private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0); @@ -52,7 +57,7 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); // Archive file generator recieved exactly one call with correct driver local folder path testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath); @@ -64,11 +69,19 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + var archiveJobResource = jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + var fileJobResource = jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); + var jobResources = new List<JobResource> { archiveJobResource, fileJobResource }; - Assert.Equal(AnyModificationTime, jobResource.LastModificationUnixTimestamp); - Assert.Equal(AnyResourceSize, jobResource.ResourceSize); - Assert.Equal(AnyUploadedResourceAbsoluteUri, jobResource.RemoteUploadPath); + foreach (var resource in jobResources) + { + Assert.Equal(AnyModificationTime, resource.LastModificationUnixTimestamp); + Assert.Equal(AnyResourceSize, resource.ResourceSize); + } + + var resourcePaths = new HashSet<string>(jobResources.Select(resource => resource.RemoteUploadPath)); + Assert.True(resourcePaths.Contains(AnyUploadedResourceAbsoluteUri)); + Assert.True(resourcePaths.Contains(AnyJobFileResourceAbsoluteUri)); } [Fact] @@ -77,14 +90,20 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); - testContext.FileSystem.Received(1).CreateUriForPath(AnyDriverResourceUploadPath); testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath); + + testContext.FileSystem.Received(2).CreateUriForPath(AnyDriverResourceUploadPath); testContext.FileSystem.Received(1) .CopyFromLocal(AnyLocalArchivePath, new Uri(AnyUploadedResourceAbsoluteUri)); - testContext.FileSystem.Received(1) + testContext.FileSystem.Received(2) .CreateDirectory(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath)); + + testContext.FileSystem.Received(1).CreateUriForPath(AnyJobFileResourcePath); + testContext.FileSystem.Received(1) + .CopyFromLocal(AnyLocalJobFilePath, new Uri(AnyJobFileResourceAbsoluteUri)); } private class TestContext @@ -98,12 +117,19 @@ namespace Org.Apache.REEF.Client.Tests var injector = TangFactory.GetTang().NewInjector(); FileSystem.GetFileStatus(new Uri(AnyUploadedResourceAbsoluteUri)) .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize)); + FileSystem.GetFileStatus(new Uri(AnyJobFileResourceAbsoluteUri)) + .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize)); ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath) .Returns(AnyLocalArchivePath); FileSystem.CreateUriForPath(AnyDriverResourceUploadPath) .Returns(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath)); FileSystem.CreateUriForPath(AnyUploadedResourcePath) .Returns(new Uri(AnyUploadedResourceAbsoluteUri)); + FileSystem.CreateUriForPath(AnyJobFileResourcePath) + .Returns(new Uri(AnyJobFileResourceAbsoluteUri)); + IFile file = Substitute.For<IFile>(); + file.Exists(Arg.Any<string>()).Returns(true); + injector.BindVolatileInstance(GenericType<IFile>.Class, file); injector.BindVolatileInstance(GenericType<IResourceArchiveFileGenerator>.Class, ResourceArchiveFileGenerator); injector.BindVolatileInstance(GenericType<IFileSystem>.Class, FileSystem); return injector.GetInstance<FileSystemJobResourceUploader>(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs index 2f66638..7cf9c1f 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Collections.Generic; using System.IO; using NSubstitute; using Org.Apache.REEF.Client.Common; @@ -29,8 +30,9 @@ namespace Org.Apache.REEF.Client.Tests public class LegacyJobResourceUploaderTests { private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path"; + private const string AnyLocalJobFilePath = AnyDriverLocalFolderPath + @"\job-submission-params.json"; private const string AnyDriverResourceUploadPath = "/vol1/tmp"; - private const string AnyUploadedResourcePath = "hdfs://foo/vol1/tmp/driver.zip"; + private const string AnyUploadedResourcePath = "hdfs://foo/vol1/tmp/anyFile"; private const long AnyModificationTime = 1446161745550; private const long AnyResourceSize = 53092; @@ -40,7 +42,7 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); // Archive file generator recieved exactly one call with correct driver local folder path with trailing \ testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\"); @@ -52,22 +54,36 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip"; + var anyLocalJobFilePath = AnyDriverLocalFolderPath.TrimEnd('\\') + @"\job-submission-params.json"; testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\") .Returns(anyLocalArchivePath); - jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath); const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader"; Guid notUsed; // Clientlauncher is called with correct class name, local archive path, upload path and temp file. - testContext.JavaClientLauncher.Received() + testContext.JavaClientLauncher.Received(1) .Launch(javaClassNameForResourceUploader, anyLocalArchivePath, + "ARCHIVE", AnyDriverResourceUploadPath + "/", Arg.Is<string>( outputFilePath => Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath() && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed))); + + // Clientlauncher is called with correct class name, local job file path, upload path and temp file. + testContext.JavaClientLauncher.Received(1) + .Launch(javaClassNameForResourceUploader, + anyLocalJobFilePath, + "FILE", + AnyDriverResourceUploadPath + "/", + Arg.Is<string>( + outputFilePath => + Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath() + && Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed))); } [Fact] @@ -77,7 +93,7 @@ namespace Org.Apache.REEF.Client.Tests var jobResourceUploader = testContext.GetJobResourceUploader(fileExistsReturnValue: false); // throws filenotfound exception - Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath)); + Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath)); } [Fact] @@ -86,11 +102,19 @@ namespace Org.Apache.REEF.Client.Tests var testContext = new TestContext(); var jobResourceUploader = testContext.GetJobResourceUploader(); - var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath); + var jobResources = new List<JobResource>() + { + jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath), + jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath) + }; - Assert.Equal(AnyModificationTime, jobResource.LastModificationUnixTimestamp); - Assert.Equal(AnyResourceSize, jobResource.ResourceSize); - Assert.Equal(AnyUploadedResourcePath, jobResource.RemoteUploadPath); + Assert.Equal(jobResources.Count, 2); + foreach (var resource in jobResources) + { + Assert.Equal(AnyModificationTime, resource.LastModificationUnixTimestamp); + Assert.Equal(AnyResourceSize, resource.ResourceSize); + Assert.Equal(AnyUploadedResourcePath, resource.RemoteUploadPath); + } } private class TestContext http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj index bc47547..c53a380 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj @@ -36,6 +36,10 @@ under the License. <BuildPackage>false</BuildPackage> </PropertyGroup> <ItemGroup> + <Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL"> + <HintPath>..\packages\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath> + <Private>True</Private> + </Reference> <Reference Include="NSubstitute, Version=1.8.2.0, Culture=neutral, PublicKeyToken=92dd2e9066daa5ca, processorArchitecture=MSIL"> <HintPath>$(PackagesDir)\NSubstitute.1.8.2.0\lib\net45\NSubstitute.dll</HintPath> <Private>True</Private> @@ -71,6 +75,7 @@ under the License. <Compile Include="YarnClientTests.cs" /> <Compile Include="YarnConfigurationUrlProviderTests.cs" /> <Compile Include="WindowsYarnJobCommandProviderTests.cs" /> + <Compile Include="YarnREEFParamSerializerTests.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj"> @@ -89,6 +94,10 @@ under the License. <Project>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</Project> <Name>Org.Apache.REEF.Utilities</Name> </ProjectReference> + <ProjectReference Include="..\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj"> + <Project>{A6BAA2A7-F52F-4329-884E-1BCF711D6805}</Project> + <Name>Org.Apache.REEF.Driver</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <None Include="packages.config" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs index a8472ad..86c3c50 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/WindowsYarnJobCommandProviderTests.cs @@ -73,8 +73,9 @@ namespace Org.Apache.REEF.Client.Tests "arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" + "/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" + "/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" + - "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" + - "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; + "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" + + ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" + + " 2> <LOG_DIR>/driver.stderr"; var commandBuilder = testContext.GetCommandBuilder(); var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand(); @@ -97,8 +98,9 @@ namespace Org.Apache.REEF.Client.Tests "/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" + "/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef -Djava." + "util.logging.config.class=org.apache.reef.util.logging.Config org.apac" + - "he.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submiss" + - "ion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; + "he.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" + + ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" + + " 2> <LOG_DIR>/driver.stderr"; var commandBuilder = testContext.GetCommandBuilder(true); var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand(); Assert.Equal(expectedCommand, jobSubmissionCommand); @@ -120,8 +122,9 @@ namespace Org.Apache.REEF.Client.Tests "arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" + "/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" + "/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" + - "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" + - "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; + "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" + + ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" + + " 2> <LOG_DIR>/driver.stderr"; string expectedCommand = string.Format(expectedCommandFormat, sizeMB); var commandBuilder = testContext.GetCommandBuilder(maxMemAllocPoolSize: sizeMB); var jobSubmissionCommand = commandBuilder.GetJobSubmissionCommand(); @@ -144,8 +147,9 @@ namespace Org.Apache.REEF.Client.Tests "arn/lib/*;%HADOOP_HOME%/share/hadoop/hdfs/*;%HADOOP_HOME%/share/hadoop" + "/hdfs/lib/*;%HADOOP_HOME%/share/hadoop/mapreduce/*;%HADOOP_HOME%/share" + "/hadoop/mapreduce/lib/*;reef/local/*;reef/global/* -Dproc_reef org.apa" + - "che.reef.bridge.client.YarnBootstrapREEFLauncher reef/local/job-submis" + - "sion-params.json 1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; + "che.reef.bridge.client.YarnBootstrapREEFLauncher job-submission-params" + + ".json reef/local/app-submission-params.json 1> <LOG_DIR>/driver.stdout" + + " 2> <LOG_DIR>/driver.stderr"; string expectedCommand = string.Format(expectedCommandFormat, sizeMB); var commandBuilder = testContext.GetCommandBuilder(maxPermSize: sizeMB); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs new file mode 100644 index 0000000..4fd1264 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using Newtonsoft.Json.Linq; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; +using Xunit; + +namespace Org.Apache.REEF.Client.Tests +{ + public sealed class YarnREEFParamSerializerTests + { + private const int AnyInt = 1000; + private const string AnyString = "Any"; + + [Fact] + public void TestYarnREEFDotNetAppSerialization() + { + const string formatStr = "{{" + + "\"sharedAppSubmissionParameters\":" + + "{{" + + "\"tcpBeginPort\":{0}," + + "\"tcpRangeCount\":{0}," + + "\"tcpTryCount\":{0}" + + "}}," + + "\"driverMemory\":{0}," + + "\"driverRecoveryTimeout\":{0}" + + "}}"; + + var expectedJson = string.Format(formatStr, AnyInt); + + var tcpConf = TcpPortConfigurationModule.ConfigurationModule + .Set(TcpPortConfigurationModule.PortRangeCount, AnyInt.ToString()) + .Set(TcpPortConfigurationModule.PortRangeStart, AnyInt.ToString()) + .Set(TcpPortConfigurationModule.PortRangeTryCount, AnyInt.ToString()) + .Build(); + + var driverConf = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverStartHandler>.Class) + .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, AnyInt.ToString()) + .Build(); + + var injector = TangFactory.GetTang().NewInjector(tcpConf, driverConf); + + var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>(); + var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() + .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build(); + + var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector, AnyString); + var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedJsonObject = JObject.Parse(expectedJson); + Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); + } + + [Fact] + public void TestYarnREEFDotNetJobSerialization() + { + const string formatString = + "{{" + + "\"sharedJobSubmissionParameters\":" + + "{{" + + "\"jobId\":\"{0}\"," + + "\"jobSubmissionFolder\":\"{0}\"" + + "}}," + + "\"dfsJobSubmissionFolder\":\"{0}\"," + + "\"jobSubmissionDirectoryPrefix\":\"{0}\"" + + "}}"; + + var expectedJson = string.Format(formatString, AnyString); + var injector = TangFactory.GetTang().NewInjector(); + + var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>(); + var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() + .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build(); + + var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString, AnyString); + var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedJsonObject = JObject.Parse(expectedJson); + Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); + } + + [Fact] + public void TestYarnREEFAppSerialization() + { + const string formatString = "{{" + + "\"yarnAppSubmissionParameters\":" + + "{{\"sharedAppSubmissionParameters\":" + + "{{\"tcpBeginPort\":{0}," + + "\"tcpRangeCount\":{0}," + + "\"tcpTryCount\":{0}" + + "}}," + + "\"driverMemory\":{0}," + + "\"driverRecoveryTimeout\":{0}" + + "}}," + + "\"maxApplicationSubmissions\":{0}" + + "}}"; + + var expectedJson = string.Format(formatString, AnyInt); + var tcpConf = TcpPortConfigurationModule.ConfigurationModule + .Set(TcpPortConfigurationModule.PortRangeCount, AnyInt.ToString()) + .Set(TcpPortConfigurationModule.PortRangeStart, AnyInt.ToString()) + .Set(TcpPortConfigurationModule.PortRangeTryCount, AnyInt.ToString()) + .Build(); + + var driverConf = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverStartHandler>.Class) + .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, AnyInt.ToString()) + .Set(DriverConfiguration.MaxApplicationSubmissions, AnyInt.ToString()).Build(); + + var injector = TangFactory.GetTang().NewInjector(tcpConf, driverConf); + + var serializer = injector.GetInstance<YarnREEFParamSerializer>(); + var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() + .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build(); + + var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector); + var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedJsonObject = JObject.Parse(expectedJson); + Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); + } + + [Fact] + public void TestYarnREEFJobSerialization() + { + const string formatString = + "{{" + + "\"yarnJobSubmissionParameters\":" + + "{{" + + "\"sharedJobSubmissionParameters\":" + + "{{" + + "\"jobId\":\"{0}\"," + + "\"jobSubmissionFolder\":\"{0}\"" + + "}},\"dfsJobSubmissionFolder\":\"NULL\"," + + "\"jobSubmissionDirectoryPrefix\":\"{0}\"" + + "}}," + + "\"securityTokenKind\":\"{0}\",\"securityTokenService\":\"{0}\"" + + "}}"; + + var conf = YARNClientConfiguration.ConfigurationModule + .Set(YARNClientConfiguration.SecurityTokenKind, AnyString) + .Set(YARNClientConfiguration.SecurityTokenService, AnyString) + .Set(YARNClientConfiguration.JobSubmissionFolderPrefix, AnyString) + .Build(); + + var expectedJson = string.Format(formatString, AnyString); + var injector = TangFactory.GetTang().NewInjector(conf); + + var serializer = injector.GetInstance<YarnREEFParamSerializer>(); + var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() + .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build(); + + var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString); + var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedJsonObject = JObject.Parse(expectedJson); + Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); + } + + private sealed class DriverStartHandler : IObserver<IDriverStarted> + { + public void OnNext(IDriverStarted value) + { + // Intentionally empty. + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client.Tests/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/packages.config b/lang/cs/Org.Apache.REEF.Client.Tests/packages.config index 4a71ef7..657c994 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/packages.config +++ b/lang/cs/Org.Apache.REEF.Client.Tests/packages.config @@ -18,6 +18,7 @@ specific language governing permissions and limitations under the License. --> <packages> + <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" /> <package id="NSubstitute" version="1.8.2.0" targetFramework="net45" /> <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" /> <package id="xunit" version="2.1.0" targetFramework="net45" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs new file mode 100644 index 0000000..f43de0d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Avro/AvroAppSubmissionParameters.cs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Client.Avro +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters. + /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] + public sealed class AvroAppSubmissionParameters + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the tcpBeginPort field. + /// </summary> + [DataMember] + public int tcpBeginPort { get; set; } + + /// <summary> + /// Gets or sets the tcpRangeCount field. + /// </summary> + [DataMember] + public int tcpRangeCount { get; set; } + + /// <summary> + /// Gets or sets the tcpTryCount field. + /// </summary> + [DataMember] + public int tcpTryCount { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroAppSubmissionParameters"/> class. + /// </summary> + public AvroAppSubmissionParameters() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroAppSubmissionParameters"/> class. + /// </summary> + /// <param name="tcpBeginPort">The tcpBeginPort.</param> + /// <param name="tcpRangeCount">The tcpRangeCount.</param> + /// <param name="tcpTryCount">The tcpTryCount.</param> + public AvroAppSubmissionParameters(int tcpBeginPort, int tcpRangeCount, int tcpTryCount) + { + this.tcpBeginPort = tcpBeginPort; + this.tcpRangeCount = tcpRangeCount; + this.tcpTryCount = tcpTryCount; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs index cb7fafd..88529dd 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -48,24 +48,6 @@ namespace Org.Apache.REEF.Client.Avro public string jobId { get; set; } /// <summary> - /// Gets or sets the tcpBeginPort field. - /// </summary> - [DataMember] - public int tcpBeginPort { get; set; } - - /// <summary> - /// Gets or sets the tcpRangeCount field. - /// </summary> - [DataMember] - public int tcpRangeCount { get; set; } - - /// <summary> - /// Gets or sets the tcpTryCount field. - /// </summary> - [DataMember] - public int tcpTryCount { get; set; } - - /// <summary> /// Gets or sets the jobSubmissionFolder field. /// </summary> [DataMember] @@ -82,16 +64,10 @@ namespace Org.Apache.REEF.Client.Avro /// Initializes a new instance of the <see cref="AvroJobSubmissionParameters"/> class. /// </summary> /// <param name="jobId">The jobId.</param> - /// <param name="tcpBeginPort">The tcpBeginPort.</param> - /// <param name="tcpRangeCount">The tcpRangeCount.</param> - /// <param name="tcpTryCount">The tcpTryCount.</param> /// <param name="jobSubmissionFolder">The jobSubmissionFolder.</param> - public AvroJobSubmissionParameters(string jobId, int tcpBeginPort, int tcpRangeCount, int tcpTryCount, string jobSubmissionFolder) + public AvroJobSubmissionParameters(string jobId, string jobSubmissionFolder) { this.jobId = jobId; - this.tcpBeginPort = tcpBeginPort; - this.tcpRangeCount = tcpRangeCount; - this.tcpTryCount = tcpTryCount; this.jobSubmissionFolder = jobSubmissionFolder; } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs new file mode 100644 index 0000000..d4923f4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalAppSubmissionParameters.cs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Client.Avro.Local +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters. + /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] + public sealed class AvroLocalAppSubmissionParameters + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters"",""doc"":""Cross-language application submission parameters to the Local runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""maxNumberOfConcurrentEvaluators"",""type"":""int""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the sharedAppSubmissionParameters field. + /// </summary> + [DataMember] + public AvroAppSubmissionParameters sharedAppSubmissionParameters { get; set; } + + /// <summary> + /// Gets or sets the maxNumberOfConcurrentEvaluators field. + /// </summary> + [DataMember] + public int maxNumberOfConcurrentEvaluators { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroLocalAppSubmissionParameters"/> class. + /// </summary> + public AvroLocalAppSubmissionParameters() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroLocalAppSubmissionParameters"/> class. + /// </summary> + /// <param name="sharedAppSubmissionParameters">The sharedAppSubmissionParameters.</param> + /// <param name="maxNumberOfConcurrentEvaluators">The maxNumberOfConcurrentEvaluators.</param> + public AvroLocalAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int maxNumberOfConcurrentEvaluators) + { + this.sharedAppSubmissionParameters = sharedAppSubmissionParameters; + this.maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs deleted file mode 100644 index bf90109..0000000 --- a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System.Runtime.Serialization; -using Org.Apache.REEF.Utilities.Attributes; - -namespace Org.Apache.REEF.Client.Avro.Local -{ - /// <summary> - /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters. - /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder. - /// </summary> - [Private] - [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] - public sealed class AvroLocalJobSubmissionParameters - { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the Local runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""maxNumberOfConcurrentEvaluators"",""type"":""int""}]}"; - - /// <summary> - /// Gets the schema. - /// </summary> - public static string Schema - { - get - { - return JsonSchema; - } - } - - /// <summary> - /// Gets or sets the sharedJobSubmissionParameters field. - /// </summary> - [DataMember] - public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; } - - /// <summary> - /// Gets or sets the maxNumberOfConcurrentEvaluators field. - /// </summary> - [DataMember] - public int maxNumberOfConcurrentEvaluators { get; set; } - - /// <summary> - /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class. - /// </summary> - public AvroLocalJobSubmissionParameters() - { - } - - /// <summary> - /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class. - /// </summary> - /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param> - /// <param name="maxNumberOfConcurrentEvaluators">The maxNumberOfConcurrentEvaluators.</param> - public AvroLocalJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int maxNumberOfConcurrentEvaluators) - { - this.sharedJobSubmissionParameters = sharedJobSubmissionParameters; - this.maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators; - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs new file mode 100644 index 0000000..a3d0866 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Client.Avro.YARN +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] + public sealed class AvroYarnClusterAppSubmissionParameters + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters"",""doc"":""Cross-language application submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""dr iverRecoveryTimeout"",""type"":""int""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the yarnAppSubmissionParameters field. + /// </summary> + [DataMember] + public AvroYarnAppSubmissionParameters yarnAppSubmissionParameters { get; set; } + + /// <summary> + /// Gets or sets the maxApplicationSubmissions field. + /// </summary> + [DataMember] + public int maxApplicationSubmissions { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class. + /// </summary> + public AvroYarnClusterAppSubmissionParameters() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class. + /// </summary> + /// <param name="yarnAppSubmissionParameters">The yarnAppSubmissionParameters.</param> + /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> + public AvroYarnClusterAppSubmissionParameters(AvroYarnAppSubmissionParameters yarnAppSubmissionParameters, int maxApplicationSubmissions) + { + this.yarnAppSubmissionParameters = yarnAppSubmissionParameters; + this.maxApplicationSubmissions = maxApplicationSubmissions; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs new file mode 100644 index 0000000..0612800 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Client.Avro.YARN +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters. + /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] + public sealed class AvroYarnAppSubmissionParameters + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the sharedAppSubmissionParameters field. + /// </summary> + [DataMember] + public AvroAppSubmissionParameters sharedAppSubmissionParameters { get; set; } + + /// <summary> + /// Gets or sets the driverMemory field. + /// </summary> + [DataMember] + public int driverMemory { get; set; } + + /// <summary> + /// Gets or sets the driverRecoveryTimeout field. + /// </summary> + [DataMember] + public int driverRecoveryTimeout { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroYarnAppSubmissionParameters"/> class. + /// </summary> + public AvroYarnAppSubmissionParameters() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroYarnAppSubmissionParameters"/> class. + /// </summary> + /// <param name="sharedAppSubmissionParameters">The sharedAppSubmissionParameters.</param> + /// <param name="driverMemory">The driverMemory.</param> + /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param> + public AvroYarnAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int driverMemory, int driverRecoveryTimeout) + { + this.sharedAppSubmissionParameters = sharedAppSubmissionParameters; + this.driverMemory = driverMemory; + this.driverRecoveryTimeout = driverRecoveryTimeout; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs index d245461..159c8cf 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnClusterJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}} ,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":[""null"",""string""]},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -48,12 +48,6 @@ namespace Org.Apache.REEF.Client.Avro.YARN public AvroYarnJobSubmissionParameters yarnJobSubmissionParameters { get; set; } /// <summary> - /// Gets or sets the maxApplicationSubmissions field. - /// </summary> - [DataMember] - public int maxApplicationSubmissions { get; set; } - - /// <summary> /// Gets or sets the securityTokenKind field. /// </summary> [DataMember] @@ -78,13 +72,11 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class. /// </summary> /// <param name="yarnJobSubmissionParameters">The yarnJobSubmissionParameters.</param> - /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> /// <param name="securityTokenKind">The securityTokenKind.</param> /// <param name="securityTokenService">The securityTokenService.</param> - public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, int maxApplicationSubmissions, string securityTokenKind, string securityTokenService) + public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService) { this.yarnJobSubmissionParameters = yarnJobSubmissionParameters; - this.maxApplicationSubmissions = maxApplicationSubmissions; this.securityTokenKind = securityTokenKind; this.securityTokenService = securityTokenService; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs index 65f96c3..9f03dac 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -48,18 +48,6 @@ namespace Org.Apache.REEF.Client.Avro.YARN public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; } /// <summary> - /// Gets or sets the driverMemory field. - /// </summary> - [DataMember] - public int driverMemory { get; set; } - - /// <summary> - /// Gets or sets the driverRecoveryTimeout field. - /// </summary> - [DataMember] - public int driverRecoveryTimeout { get; set; } - - /// <summary> /// Gets or sets the dfsJobSubmissionFolder field. /// </summary> [DataMember] @@ -83,15 +71,11 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// Initializes a new instance of the <see cref="AvroYarnJobSubmissionParameters"/> class. /// </summary> /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param> - /// <param name="driverMemory">The driverMemory.</param> - /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param> /// <param name="dfsJobSubmissionFolder">The dfsJobSubmissionFolder.</param> /// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param> - public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix) + public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix) { this.sharedJobSubmissionParameters = sharedJobSubmissionParameters; - this.driverMemory = driverMemory; - this.driverRecoveryTimeout = driverRecoveryTimeout; this.dfsJobSubmissionFolder = dfsJobSubmissionFolder; this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs index 0fb9e22..acc2a69 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs @@ -90,29 +90,45 @@ namespace Org.Apache.REEF.Client.Local // Intentionally left blank. } - private string CreateBootstrapAvroConfig(IJobSubmission jobSubmission, string driverFolder) + private string CreateBootstrapAvroJobConfig(IJobSubmission jobSubmission, string driverFolder) { - var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); - - var bootstrapArgs = new AvroJobSubmissionParameters + var bootstrapJobArgs = new AvroJobSubmissionParameters { jobSubmissionFolder = driverFolder, jobId = jobSubmission.JobIdentifier, + }; + + var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile()); + using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) + { + var serializedArgs = AvroJsonSerializer<AvroJobSubmissionParameters>.ToBytes(bootstrapJobArgs); + argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); + } + + return submissionArgsFilePath; + } + + private string CreateBootstrapAvroAppConfig(IJobSubmission jobSubmission, string driverFolder) + { + var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); + + var bootstrapAppArgs = new AvroAppSubmissionParameters + { tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(), tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(), tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(), }; - var avroLocalBootstrapArgs = new AvroLocalJobSubmissionParameters + var avroLocalBootstrapAppArgs = new AvroLocalAppSubmissionParameters { - sharedJobSubmissionParameters = bootstrapArgs, + sharedAppSubmissionParameters = bootstrapAppArgs, maxNumberOfConcurrentEvaluators = _maxNumberOfConcurrentEvaluators }; - var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile()); + var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetAppSubmissionParametersFile()); using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) { - var serializedArgs = AvroJsonSerializer<AvroLocalJobSubmissionParameters>.ToBytes(avroLocalBootstrapArgs); + var serializedArgs = AvroJsonSerializer<AvroLocalAppSubmissionParameters>.ToBytes(avroLocalBootstrapAppArgs); argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); } @@ -134,17 +150,19 @@ namespace Org.Apache.REEF.Client.Local public void Submit(IJobSubmission jobSubmission) { var driverFolder = PrepareDriverFolder(jobSubmission); - var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder); - _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath); + var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobSubmission, driverFolder); + var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobSubmission, driverFolder); + _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath); Logger.Log(Level.Info, "Submitted the Driver for execution."); } public IJobSubmissionResult SubmitAndGetJobStatus(IJobSubmission jobSubmission) { var driverFolder = PrepareDriverFolder(jobSubmission); - var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder); + var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobSubmission, driverFolder); + var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobSubmission, driverFolder); - Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath)); + Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath)); var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint); JobSubmissionResult result = new LocalJobSubmissionResult(this, fileName); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index fed7dc3..60316e4 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -69,8 +69,11 @@ under the License. <Compile Include="API\JobSubmissionBuilder.cs" /> <Compile Include="API\JobSubmissionBuilderFactory.cs" /> <Compile Include="API\TcpPortConfigurationModule.cs" /> + <Compile Include="Avro\AvroAppSubmissionParameters.cs" /> <Compile Include="Avro\AvroJobSubmissionParameters.cs" /> - <Compile Include="Avro\Local\AvroLocalJobSubmissionParameters.cs" /> + <Compile Include="Avro\Local\AvroLocalAppSubmissionParameters.cs" /> + <Compile Include="Avro\YARN\AvroClusterAppSubmissionParameters.cs" /> + <Compile Include="Avro\YARN\AvroYarnAppSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" /> <Compile Include="Common\DotNetFile.cs" /> @@ -158,6 +161,8 @@ under the License. <Compile Include="YARN\YARNClientConfiguration.cs" /> <Compile Include="YARN\YarnCommandLineEnvironment.cs" /> <Compile Include="YARN\YarnREEFDotNetClient.cs" /> + <Compile Include="YARN\YarnREEFDotNetParamSerializer.cs" /> + <Compile Include="YARN\YarnREEFParamSerializer.cs" /> </ItemGroup> <ItemGroup> <None Include="Avro\README.md" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs index b4b1640..8123f2c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs @@ -23,11 +23,19 @@ namespace Org.Apache.REEF.Client.Yarn public interface IJobResourceUploader { /// <summary> - /// Upload archived local driver folder to DFS destination path. + /// Creates the archive from local driver folder and uploads it to DFS destination path. /// </summary> /// <param name="driverLocalFolderPath">Local folder where REEF application resources are staged</param> - /// <param name="jobSubmissionDirectory">Remote directory path where we will upload resources</param> + /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param> /// <returns>Path, modification time and size of uploaded file as JobResource</returns> - JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory); + JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath); + + /// <summary> + /// Locates a file resource and uploads it to DFS destination path. + /// </summary> + /// <param name="fileLocalPath">file path</param> + /// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param> + /// <returns>Path, modification time and size of uploaded file as JobResource</returns> + JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs b/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs index ae1a634..7b78dee 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/JobResource.cs @@ -15,14 +15,22 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Client.YARN.RestClient.DataModel; +using Org.Apache.REEF.Utilities.Attributes; + namespace Org.Apache.REEF.Client.Yarn { - public class JobResource + [Unstable("New API.")] + public sealed class JobResource { + public string Name { get; set; } + public string RemoteUploadPath { get; set; } public long LastModificationUnixTimestamp { get; set; } public long ResourceSize { get; set; } + + public ResourceType ResourceType { get; set; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs index 5fde77d..94fde87 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs @@ -16,8 +16,11 @@ // under the License. using System; +using System.Collections.Generic; using System.IO; using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Client.YARN.RestClient.DataModel; +using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; @@ -42,38 +45,72 @@ namespace Org.Apache.REEF.Client.Yarn private readonly IJavaClientLauncher _javaLauncher; private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator; private readonly IFile _file; + private readonly REEFFileNames _reefFileNames; [Inject] private LegacyJobResourceUploader( IJavaClientLauncher javaLauncher, IResourceArchiveFileGenerator resourceArchiveFileGenerator, IFile file, - IYarnCommandLineEnvironment yarn) + IYarnCommandLineEnvironment yarn, + REEFFileNames reefFileNames) { _file = file; _resourceArchiveFileGenerator = resourceArchiveFileGenerator; _javaLauncher = javaLauncher; _javaLauncher.AddToClassPath(yarn.GetYarnClasspathList()); + _reefFileNames = reefFileNames; } - public JobResource UploadJobResource(string driverLocalFolderPath, string jobSubmissionDirectory) + public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath) { driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\"; - string driverUploadPath = jobSubmissionDirectory.TrimEnd('/') + @"/"; + var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath); var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath); + return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName()); + } - var resourceDetailsOutputPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N")); - _javaLauncher.Launch(JavaClassNameForResourceUploader, - archivePath, - driverUploadPath, - resourceDetailsOutputPath); + public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath) + { + var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/"; + var jobArgsFilePath = fileLocalPath; + return GetJobResource(jobArgsFilePath, ResourceType.FILE, driverUploadPath); + } - return ParseGeneratedOutputFile(resourceDetailsOutputPath); + private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null) + { + if (!_file.Exists(filePath)) + { + Exceptions.Throw( + new FileNotFoundException("Could not find resource file " + filePath), + Log); + } + + var detailsOutputPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N")); + + try + { + _javaLauncher.Launch(JavaClassNameForResourceUploader, + filePath, + resourceType.ToString(), + driverUploadPath, + detailsOutputPath); + + var localizedResourceName = localizedName ?? Path.GetFileName(filePath); + return ParseGeneratedOutputFile(detailsOutputPath, localizedResourceName, resourceType); + } + finally + { + if (_file.Exists(detailsOutputPath)) + { + _file.Delete(detailsOutputPath); + } + } } - private JobResource ParseGeneratedOutputFile(string resourceDetailsOutputPath) + private JobResource ParseGeneratedOutputFile(string resourceDetailsOutputPath, string resourceName, ResourceType resourceType) { if (!_file.Exists(resourceDetailsOutputPath)) { @@ -91,9 +128,11 @@ namespace Org.Apache.REEF.Client.Yarn return new JobResource { + Name = resourceName, RemoteUploadPath = tokens[0], LastModificationUnixTimestamp = long.Parse(tokens[1]), - ResourceSize = long.Parse(tokens[2]) + ResourceSize = long.Parse(tokens[2]), + ResourceType = resourceType }; } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs index c27d330..999b6b4 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/LocalResources.cs @@ -56,7 +56,7 @@ namespace Org.Apache.REEF.Client.YARN.RestClient.DataModel public long Timestamp { get; set; } } - internal enum ResourceType + public enum ResourceType { ARCHIVE,
