Repository: reef Updated Branches: refs/heads/master 24241bc8f -> 1f965d794
[REEF-1191] Allow specification of log directories in job parameters JIRA: [REEF-1191] https://issues.apache.org/jira/browse/REEF-1191 Pull Request: Closes #877 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1f965d79 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1f965d79 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1f965d79 Branch: refs/heads/master Commit: 1f965d79464f7ae800a08cd3a6e046e9ef661e23 Parents: 24241bc Author: Andrew Chung <[email protected]> Authored: Tue Mar 1 12:42:53 2016 -0800 Committer: Julia Wang <[email protected]> Committed: Tue Mar 29 12:38:35 2016 -0700 ---------------------------------------------------------------------- .../YarnREEFParamSerializerTests.cs | 6 +- .../Org.Apache.REEF.Client/API/JobParameters.cs | 38 ++++++++- .../API/JobParametersBuilder.cs | 27 ++++++- .../API/JobRequestBuilder.cs | 18 +++++ .../Local/AvroLocalJobSubmissionParameters.cs | 85 ++++++++++++++++++++ .../AvroYarnClusterJobSubmissionParameters.cs | 20 ++++- .../Org.Apache.REEF.Client/Local/LocalClient.cs | 9 ++- .../Org.Apache.REEF.Client.csproj | 4 + .../YARN/Parameters/DriverStderrFilePath.cs | 29 +++++++ .../YARN/Parameters/DriverStdoutFilePath.cs | 29 +++++++ .../YARN/WindowsYarnJobCommandProvider.cs | 8 +- .../YARN/YarnCommandProviderConfiguration.cs | 40 +++++++++ .../YARN/YarnREEFDotNetClient.cs | 26 +++++- .../YARN/YarnREEFParamSerializer.cs | 6 +- .../Files/REEFFileNames.cs | 34 ++++++++ .../src/main/avro/JobSubmissionParameters.avsc | 15 +++- .../apache/reef/bridge/client/LocalClient.java | 3 +- .../bridge/client/LocalSubmissionFromCS.java | 47 ++++++++--- .../client/YarnClusterSubmissionFromCS.java | 14 ++++ .../bridge/client/YarnJobSubmissionClient.java | 2 + ...SubmissionParametersSerializationFromCS.java | 6 +- .../local/client/LocalJobSubmissionHandler.java | 2 +- .../client/PreparedDriverFolderLauncher.java | 18 +++-- .../yarn/client/YarnSubmissionHelper.java | 32 +++++++- 24 files changed, 480 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs index 07ae22f..4cfc78c 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs @@ -156,7 +156,9 @@ namespace Org.Apache.REEF.Client.Tests "\"securityTokenKind\":\"{0}\"," + "\"securityTokenService\":\"{0}\"," + "\"maxApplicationSubmissions\":{1}," + - "\"driverMemory\":{1}" + + "\"driverMemory\":{1}," + + "\"driverStdoutFilePath\":\"{0}\"," + + "\"driverStderrFilePath\":\"{0}\"" + "}}"; var conf = YARNClientConfiguration.ConfigurationModule @@ -173,6 +175,8 @@ namespace Org.Apache.REEF.Client.Tests .SetJobIdentifier(AnyString) .SetMaxApplicationSubmissions(AnyInt) .SetDriverMemory(AnyInt) + .SetDriverStderrFilePath(AnyString) + .SetDriverStdoutFilePath(AnyString) .Build(); var serializedBytes = serializer.SerializeJobArgsToBytes(jobRequest.JobParameters, AnyString); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs index ef5bed6..d7da2bb 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs @@ -16,6 +16,7 @@ // under the License. using System; +using Org.Apache.REEF.Utilities; namespace Org.Apache.REEF.Client.API { @@ -29,19 +30,36 @@ namespace Org.Apache.REEF.Client.API private readonly string _jobIdentifier; private readonly int _maxApplicationSubmissions; private readonly int _driverMemory; + private readonly Optional<string> _stdoutFilePath; + private readonly Optional<string> _stderrFilePath; - internal JobParameters(string jobIdentifier, int maxApplicationSubmissions, int driverMemory) + internal JobParameters( + string jobIdentifier, + int maxApplicationSubmissions, + int driverMemory, + string stdoutFilePath, + string stderrFilePath) { _jobIdentifier = jobIdentifier; _maxApplicationSubmissions = maxApplicationSubmissions; _driverMemory = driverMemory; + + _stdoutFilePath = string.IsNullOrWhiteSpace(stdoutFilePath) ? + Optional<string>.Empty() : Optional<string>.Of(stdoutFilePath); + + _stderrFilePath = string.IsNullOrWhiteSpace(stderrFilePath) ? + Optional<string>.Empty() : Optional<string>.Of(stderrFilePath); } [Obsolete("Introduced to bridge deprecation of IJobSubmission.")] internal static JobParameters FromJobSubmission(IJobSubmission jobSubmission) { return new JobParameters( - jobSubmission.JobIdentifier, jobSubmission.MaxApplicationSubmissions, jobSubmission.DriverMemory); + jobSubmission.JobIdentifier, + jobSubmission.MaxApplicationSubmissions, + jobSubmission.DriverMemory, + null, + null); } /// <summary> @@ -68,5 +86,21 @@ namespace Org.Apache.REEF.Client.API { get { return _driverMemory; } } + + /// <summary> + /// Gets the file path for stdout for the driver. + /// </summary> + public Optional<string> StdoutFilePath + { + get { return _stdoutFilePath; } + } + + /// <summary> + /// Gets the file path for stderr for the driver. + /// </summary> + public Optional<string> StderrFilePath + { + get { return _stderrFilePath; } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs index f960ee7..9656a0d 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs @@ -25,6 +25,8 @@ namespace Org.Apache.REEF.Client.API private string _jobIdentifier; private int _maxApplicationSubmissions = 1; private int _driverMemory = 512; + private string _stdoutFilePath = null; + private string _stderrFilePath = null; private JobParametersBuilder() { @@ -44,7 +46,12 @@ namespace Org.Apache.REEF.Client.API /// <returns></returns> public JobParameters Build() { - return new JobParameters(_jobIdentifier, _maxApplicationSubmissions, _driverMemory); + return new JobParameters( + _jobIdentifier, + _maxApplicationSubmissions, + _driverMemory, + _stdoutFilePath, + _stderrFilePath); } /// <summary> @@ -76,5 +83,23 @@ namespace Org.Apache.REEF.Client.API _driverMemory = driverMemoryInMb; return this; } + + /// <summary> + /// Sets the file path to the stdout file for the driver. + /// </summary> + public JobParametersBuilder SetDriverStdoutFilePath(string stdoutFilePath) + { + _stdoutFilePath = stdoutFilePath; + return this; + } + + /// <summary> + /// Sets the file path to the stderr file for the driver. + /// </summary> + public JobParametersBuilder SetDriverStderrFilePath(string stderrFilePath) + { + _stderrFilePath = stderrFilePath; + return this; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs index 9bef241..be5c0fe 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs @@ -156,6 +156,24 @@ namespace Org.Apache.REEF.Client.API } /// <summary> + /// Sets the stdout file path for the driver. + /// </summary> + public JobRequestBuilder SetDriverStdoutFilePath(string driverStdoutFilePath) + { + _jobParametersBuilder.SetDriverStdoutFilePath(driverStdoutFilePath); + return this; + } + + /// <summary> + /// Sets the stderr file path for the driver. + /// </summary> + public JobRequestBuilder SetDriverStderrFilePath(string driverStderrFilePath) + { + _jobParametersBuilder.SetDriverStderrFilePath(driverStderrFilePath); + return this; + } + + /// <summary> /// Driver config file contents (Org.Apache.REEF.Bridge.exe.config) contents /// Can be use to redirect assembly versions /// </summary> http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs new file mode 100644 index 0000000..8b9455f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +//<auto-generated /> + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Client.Avro.Local +{ + /// <summary> + /// Used to serialize and deserialize Avro record + /// org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters. + /// This is a (mostly) auto-generated class. + /// For instructions on how to regenerate, please view the README.md in the same folder. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] + public sealed class AvroLocalJobSubmissionParameters + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters"",""doc"":""Job submission parameters used by the local runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the sharedJobSubmissionParameters field. + /// </summary> + [DataMember] + public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; } + + /// <summary> + /// Gets or sets the driverStdoutFilePath field. + /// </summary> + [DataMember] + public string driverStdoutFilePath { get; set; } + + /// <summary> + /// Gets or sets the driverStderrFilePath field. + /// </summary> + [DataMember] + public string driverStderrFilePath { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class. + /// </summary> + public AvroLocalJobSubmissionParameters() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class. + /// </summary> + /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param> + /// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param> + /// <param name="driverStderrFilePath">The driverStderrFilePath.</param> + public AvroLocalJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string driverStdoutFilePath, string driverStderrFilePath) + { + this.sharedJobSubmissionParameters = sharedJobSubmissionParameters; + this.driverStdoutFilePath = driverStdoutFilePath; + this.driverStderrFilePath = driverStderrFilePath; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs index 2bac1e4..6badc52 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnClusterJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -72,6 +72,18 @@ namespace Org.Apache.REEF.Client.Avro.YARN public int maxApplicationSubmissions { get; set; } /// <summary> + /// Gets or sets the driverStdoutFilePath field. + /// </summary> + [DataMember] + public string driverStdoutFilePath { get; set; } + + /// <summary> + /// Gets or sets the driverStderrFilePath field. + /// </summary> + [DataMember] + public string driverStderrFilePath { get; set; } + + /// <summary> /// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class. /// </summary> public AvroYarnClusterJobSubmissionParameters() @@ -88,13 +100,17 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// <param name="securityTokenService">The securityTokenService.</param> /// <param name="driverMemory">The driverMemory.</param> /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> - public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions) + /// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param> + /// <param name="driverStderrFilePath">The driverStderrFilePath.</param> + public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath) { this.yarnJobSubmissionParameters = yarnJobSubmissionParameters; this.securityTokenKind = securityTokenKind; this.securityTokenService = securityTokenService; this.driverMemory = driverMemory; this.maxApplicationSubmissions = maxApplicationSubmissions; + this.driverStdoutFilePath = driverStdoutFilePath; + this.driverStderrFilePath = driverStderrFilePath; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs index c2d3db6..23e15f7 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs @@ -99,10 +99,17 @@ namespace Org.Apache.REEF.Client.Local jobId = jobParameters.JobIdentifier, }; + var bootstrapLocalJobArgs = new AvroLocalJobSubmissionParameters + { + sharedJobSubmissionParameters = bootstrapJobArgs, + driverStdoutFilePath = jobParameters.StdoutFilePath.IsPresent() ? jobParameters.StdoutFilePath.Value : _fileNames.GetDriverStdoutFileName(), + driverStderrFilePath = jobParameters.StderrFilePath.IsPresent() ? jobParameters.StderrFilePath.Value : _fileNames.GetDriverStderrFileName() + }; + var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile()); using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) { - var serializedArgs = AvroJsonSerializer<AvroJobSubmissionParameters>.ToBytes(bootstrapJobArgs); + var serializedArgs = AvroJsonSerializer<AvroLocalJobSubmissionParameters>.ToBytes(bootstrapLocalJobArgs); argsFileStream.Write(serializedArgs, 0, serializedArgs.Length); } http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index 14184c4..67e663b 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -78,6 +78,7 @@ under the License. <Compile Include="Avro\AvroAppSubmissionParameters.cs" /> <Compile Include="Avro\AvroJobSubmissionParameters.cs" /> <Compile Include="Avro\Local\AvroLocalAppSubmissionParameters.cs" /> + <Compile Include="Avro\Local\AvroLocalJobSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnAppSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" /> @@ -112,6 +113,8 @@ under the License. <Compile Include="YARN\IJobSubmissionDirectoryProvider.cs" /> <Compile Include="YARN\Parameters\DriverMaxMemoryAllicationPoolSizeMB.cs" /> <Compile Include="YARN\Parameters\DriverMaxPermSizeMB.cs" /> + <Compile Include="YARN\Parameters\DriverStderrFilePath.cs" /> + <Compile Include="YARN\Parameters\DriverStdoutFilePath.cs" /> <Compile Include="YARN\RestClient\HttpClient.cs" /> <Compile Include="YARN\RestClient\IDeserializer.cs" /> <Compile Include="YARN\RestClient\IHttpClient.cs" /> @@ -154,6 +157,7 @@ under the License. <Compile Include="YARN\RestClient\IUrlProvider.cs" /> <Compile Include="YARN\RestClient\FileSystemJobResourceUploader.cs" /> <Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" /> + <Compile Include="YARN\YarnCommandProviderConfiguration.cs" /> <Compile Include="YARN\YarnJobSubmissionResult.cs" /> <Compile Include="YARN\YARNREEFClient.cs" /> <Compile Include="YARN\RestClient\RestRequestExecutor.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStderrFilePath.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStderrFilePath.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStderrFilePath.cs new file mode 100644 index 0000000..d9def4b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStderrFilePath.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Client.YARN.Parameters +{ + [NamedParameter("Driver stderr file path for YARN.", defaultValue: "<LOG_DIR>/driver.stderr")] + internal sealed class DriverStderrFilePath : Name<string> + { + private DriverStderrFilePath() + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStdoutFilePath.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStdoutFilePath.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStdoutFilePath.cs new file mode 100644 index 0000000..43e5a1a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/DriverStdoutFilePath.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Client.YARN.Parameters +{ + [NamedParameter("Driver stdout file path for YARN.", defaultValue: "<LOG_DIR>/driver.stdout")] + internal sealed class DriverStdoutFilePath : Name<string> + { + private DriverStdoutFilePath() + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs index 21e3f7d..a929ec1 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs @@ -44,6 +44,8 @@ namespace Org.Apache.REEF.Client.YARN private readonly REEFFileNames _fileNames; private readonly bool _enableDebugLogging; private readonly IYarnCommandLineEnvironment _yarnCommandLineEnvironment; + private readonly string _driverStdoutFilePath; + private readonly string _driverStderrFilePath; private readonly int _driverMaxMemoryAllocationPoolSizeMB; private readonly int _driverMaxPermSizeMB; @@ -52,6 +54,8 @@ namespace Org.Apache.REEF.Client.YARN [Parameter(typeof(EnableDebugLogging))] bool enableDebugLogging, [Parameter(typeof(DriverMaxMemoryAllicationPoolSizeMB))] int driverMaxMemoryAllocationPoolSizeMB, [Parameter(typeof(DriverMaxPermSizeMB))] int driverMaxPermSizeMB, + [Parameter(typeof(DriverStdoutFilePath))] string driverStdoutFilePath, + [Parameter(typeof(DriverStderrFilePath))] string driverStderrFilePath, IYarnCommandLineEnvironment yarnCommandLineEnvironment, REEFFileNames fileNames) { @@ -59,6 +63,8 @@ namespace Org.Apache.REEF.Client.YARN _enableDebugLogging = enableDebugLogging; _fileNames = fileNames; _driverMaxMemoryAllocationPoolSizeMB = driverMaxMemoryAllocationPoolSizeMB; + _driverStdoutFilePath = driverStdoutFilePath; + _driverStderrFilePath = driverStderrFilePath; _driverMaxPermSizeMB = driverMaxPermSizeMB; } @@ -97,7 +103,7 @@ namespace Org.Apache.REEF.Client.YARN _fileNames.GetReefFolderName(), _fileNames.GetLocalFolderName(), _fileNames.GetAppSubmissionParametersFile())); - sb.Append(" " + _fileNames.GetDriverLoggingConfigCommand()); + sb.Append(" " + string.Format("1> {0} 2> {1}", _driverStdoutFilePath, _driverStderrFilePath)); return sb.ToString(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandProviderConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandProviderConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandProviderConfiguration.cs new file mode 100644 index 0000000..b508074 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandProviderConfiguration.cs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Client.YARN.Parameters; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.Client.YARN +{ + internal class YarnCommandProviderConfiguration : ConfigurationModuleBuilder + { + public static readonly OptionalParameter<string> DriverStdoutFilePath = new OptionalParameter<string>(); + public static readonly OptionalParameter<string> DriverStderrFilePath = new OptionalParameter<string>(); + + public static ConfigurationModule ConfigurationModule + { + get + { + return new YarnCommandProviderConfiguration() + .BindNamedParameter(GenericType<DriverStdoutFilePath>.Class, DriverStdoutFilePath) + .BindNamedParameter(GenericType<DriverStderrFilePath>.Class, DriverStderrFilePath) + .Build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs index dbbd840..0f58852 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs @@ -28,6 +28,7 @@ using Org.Apache.REEF.Client.YARN.RestClient.DataModel; using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -43,27 +44,27 @@ namespace Org.Apache.REEF.Client.YARN { private const string REEFApplicationType = @"REEF"; private static readonly Logger Log = Logger.GetLogger(typeof(YarnREEFDotNetClient)); + private readonly IInjector _injector; private readonly IYarnRMClient _yarnRMClient; private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper; private readonly IJobResourceUploader _jobResourceUploader; - private readonly IYarnJobCommandProvider _yarnJobCommandProvider; private readonly REEFFileNames _fileNames; private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider; private readonly YarnREEFDotNetParamSerializer _paramSerializer; [Inject] private YarnREEFDotNetClient( + IInjector injector, IYarnRMClient yarnRMClient, DriverFolderPreparationHelper driverFolderPreparationHelper, IJobResourceUploader jobResourceUploader, - IYarnJobCommandProvider yarnJobCommandProvider, REEFFileNames fileNames, IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider, YarnREEFDotNetParamSerializer paramSerializer) { + _injector = injector; _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider; _fileNames = fileNames; - _yarnJobCommandProvider = yarnJobCommandProvider; _jobResourceUploader = jobResourceUploader; _driverFolderPreparationHelper = driverFolderPreparationHelper; _yarnRMClient = yarnRMClient; @@ -157,7 +158,24 @@ namespace Org.Apache.REEF.Client.YARN int maxApplicationSubmissions, IReadOnlyCollection<JobResource> jobResources) { - string command = _yarnJobCommandProvider.GetJobSubmissionCommand(); + var commandProviderConfigModule = YarnCommandProviderConfiguration.ConfigurationModule; + if (jobParameters.StdoutFilePath.IsPresent()) + { + commandProviderConfigModule = commandProviderConfigModule + .Set(YarnCommandProviderConfiguration.DriverStdoutFilePath, jobParameters.StdoutFilePath.Value); + } + + if (jobParameters.StderrFilePath.IsPresent()) + { + commandProviderConfigModule = commandProviderConfigModule + .Set(YarnCommandProviderConfiguration.DriverStderrFilePath, jobParameters.StderrFilePath.Value); + } + + var yarnJobCommandProvider = _injector.ForkInjector(commandProviderConfigModule.Build()) + .GetInstance<IYarnJobCommandProvider>(); + + var command = yarnJobCommandProvider.GetJobSubmissionCommand(); + Log.Log(Level.Verbose, "Command for YARN: {0}", command); Log.Log(Level.Verbose, "ApplicationID: {0}", appId); Log.Log(Level.Verbose, "MaxApplicationSubmissions: {0}", maxApplicationSubmissions); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs index 9fcbb63..a6a8584 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs @@ -123,7 +123,11 @@ namespace Org.Apache.REEF.Client.YARN securityTokenService = _securityTokenService, yarnJobSubmissionParameters = avroYarnJobSubmissionParameters, driverMemory = jobParameters.DriverMemoryInMB, - maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions + maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions, + driverStdoutFilePath = string.IsNullOrWhiteSpace(jobParameters.StdoutFilePath.Value) ? + _fileNames.GetDefaultYarnDriverStdoutFilePath() : jobParameters.StdoutFilePath.Value, + driverStderrFilePath = string.IsNullOrWhiteSpace(jobParameters.StderrFilePath.Value) ? + _fileNames.GetDefaultYarnDriverStderrFilePath() : jobParameters.StderrFilePath.Value }; return AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs index 94e2e6b..872ff59 100644 --- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs +++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System; using System.Diagnostics.CodeAnalysis; using System.IO; using Org.Apache.REEF.Tang.Annotations; @@ -55,6 +56,9 @@ namespace Org.Apache.REEF.Common.Files private const string SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd"; private const string APP_SUBMISSION_PARAMETERS_FILE = "app-submission-params.json"; private const string JOB_SUBMISSION_PARAMETERS_FILE = "job-submission-params.json"; + private const string YARN_DEFAULT_DRIVER_OUT_VAR = "<LOG_DIR>"; + private const string YARN_DRIVER_STDOUT_PATH = YARN_DEFAULT_DRIVER_OUT_VAR + "/driver.stdout"; + private const string YARN_DRIVER_STDERR_PATH = YARN_DEFAULT_DRIVER_OUT_VAR + "/driver.stderr"; private const string DRIVER_COMMAND_LOGGING_CONFIG = "1> <LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr"; [Inject] @@ -262,10 +266,40 @@ namespace Org.Apache.REEF.Common.Files } /// <summary> + /// Returns the default driver log output variable for YARN. + /// Expands into the constant "<LOG_DIR>". + /// </summary> + /// <returns>"<LOG_DIR>"</returns> + public string GetYarnDriverLogOutputVariable() + { + return YARN_DEFAULT_DRIVER_OUT_VAR; + } + + /// <summary> + /// The default YARN Driver stdout file path. + /// </summary> + /// <returns></returns> + public string GetDefaultYarnDriverStdoutFilePath() + { + return YARN_DRIVER_STDOUT_PATH; + } + + /// <summary> + /// The default YARN Driver stderr file path. + /// </summary> + /// <returns></returns> + public string GetDefaultYarnDriverStderrFilePath() + { + return YARN_DRIVER_STDERR_PATH; + } + + /// <summary> /// The command that allows redirecting Driver stdout and stderr logs /// to appropriate files /// </summary> /// <returns></returns> + [Obsolete("Deprecated in 0.15. Will be removed. " + + "Please use GetDefaultYarnDriverStdoutFilePath or GetDefaultYarnDriverStderrFilePath.")] public string GetDriverLoggingConfigCommand() { return DRIVER_COMMAND_LOGGING_CONFIG; http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc index 70dc14c..2f2a0c4 100644 --- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc @@ -30,6 +30,17 @@ { "namespace": "org.apache.reef.reef.bridge.client.avro", "type": "record", + "name": "AvroLocalJobSubmissionParameters", + "doc": "Job submission parameters used by the local runtime", + "fields": [ + { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" }, + { "name": "driverStdoutFilePath", "type": "string" }, + { "name": "driverStderrFilePath", "type": "string" } + ] + }, + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", "name": "AvroYarnJobSubmissionParameters", "doc": "General cross-language submission parameters to the YARN runtime", "fields": [ @@ -48,7 +59,9 @@ { "name": "securityTokenKind", "type": "string", "default": "NULL" }, { "name": "securityTokenService", "type": "string", "default": "NULL" }, { "name": "driverMemory", "type": "int" }, - { "name": "maxApplicationSubmissions", "type": "int" } + { "name": "maxApplicationSubmissions", "type": "int" }, + { "name": "driverStdoutFilePath", "type": "string" }, + { "name": "driverStderrFilePath", "type": "string" } ] } ] http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java index 3c94acc..b7a2ec1 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java @@ -56,7 +56,8 @@ public final class LocalClient { configurationGenerator.writeConfiguration(localSubmissionFromCS.getJobFolder(), localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID); - launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID); + launcher.launch(driverFolder, localSubmissionFromCS.getDriverStdoutPath(), + localSubmissionFromCS.getDriverStderrPath()); } public static void main(final String[] args) throws IOException, InjectionException { http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java index cc5bb82..bf96d73 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java @@ -21,12 +21,13 @@ package org.apache.reef.bridge.client; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; @@ -58,8 +59,10 @@ final class LocalSubmissionFromCS { private final int tcpBeginPort; private final int tcpRangeCount; private final int tcpTryCount; + private final String driverStdoutPath; + private final String driverStderrPath; - private LocalSubmissionFromCS(final AvroJobSubmissionParameters avroJobSubmissionParameters, + private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters avroLocalJobSubmissionParameters, final AvroLocalAppSubmissionParameters avroLocalAppSubmissionParameters) { // We assume the given path to be the one of the driver. The job folder is one level up from there. final AvroAppSubmissionParameters appSubmissionParameters = @@ -69,10 +72,14 @@ final class LocalSubmissionFromCS { this.tcpTryCount = appSubmissionParameters.getTcpTryCount(); this.maxNumberOfConcurrentEvaluators = avroLocalAppSubmissionParameters.getMaxNumberOfConcurrentEvaluators(); - this.driverFolder = new File(avroJobSubmissionParameters.getJobSubmissionFolder().toString()); - this.jobId = avroJobSubmissionParameters.getJobId().toString(); + this.driverFolder = new File(avroLocalJobSubmissionParameters + .getSharedJobSubmissionParameters().getJobSubmissionFolder().toString()); + + this.jobId = avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters().getJobId().toString(); this.jobFolder = driverFolder.getParentFile(); this.runtimeRootFolder = jobFolder.getParentFile(); + this.driverStdoutPath = avroLocalJobSubmissionParameters.getDriverStdoutFilePath().toString(); + this.driverStderrPath = avroLocalJobSubmissionParameters.getDriverStderrFilePath().toString(); Validate.isTrue(driverFolder.exists(), "The driver folder does not exist."); Validate.notEmpty(jobId, "The job is is null or empty."); @@ -80,6 +87,8 @@ final class LocalSubmissionFromCS { Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0."); Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0."); Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0."); + Validate.isTrue(StringUtils.isNotEmpty(driverStdoutPath), "The stdout path is empty."); + Validate.isTrue(StringUtils.isNotEmpty(driverStderrPath), "The stderr path is empty."); } /** @@ -138,21 +147,35 @@ final class LocalSubmissionFromCS { } /** + * @return The Driver stdout path. + */ + String getDriverStdoutPath() { + return driverStdoutPath; + } + + /** + * @return The Driver stderr path. + */ + String getDriverStderrPath() { + return driverStderrPath; + } + + /** * Takes the local job submission configuration file, deserializes it, and creates submission object. */ - static LocalSubmissionFromCS fromSubmissionParameterFiles(final File jobSubmissionParametersFile, + static LocalSubmissionFromCS fromSubmissionParameterFiles(final File localJobSubmissionParametersFile, final File localAppSubmissionParametersFile) throws IOException { final AvroLocalAppSubmissionParameters localAppSubmissionParameters; - final AvroJobSubmissionParameters jobSubmissionParameters; + final AvroLocalJobSubmissionParameters localJobSubmissionParameters; - try (final FileInputStream fileInputStream = new FileInputStream(jobSubmissionParametersFile)) { + try (final FileInputStream fileInputStream = new FileInputStream(localJobSubmissionParametersFile)) { final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( - AvroJobSubmissionParameters.getClassSchema(), fileInputStream); - final SpecificDatumReader<AvroJobSubmissionParameters> reader = - new SpecificDatumReader<>(AvroJobSubmissionParameters.class); - jobSubmissionParameters = reader.read(null, decoder); + AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream); + final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader = + new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class); + localJobSubmissionParameters = reader.read(null, decoder); } try (final FileInputStream fileInputStream = new FileInputStream(localAppSubmissionParametersFile)) { @@ -163,6 +186,6 @@ final class LocalSubmissionFromCS { localAppSubmissionParameters = reader.read(null, decoder); } - return new LocalSubmissionFromCS(jobSubmissionParameters, localAppSubmissionParameters); + return new LocalSubmissionFromCS(localJobSubmissionParameters, localAppSubmissionParameters); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java index 44eaffc..ccde624 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java @@ -54,6 +54,8 @@ final class YarnClusterSubmissionFromCS { private final String tokenKind; private final String tokenService; private final String jobSubmissionDirectoryPrefix; + private final String yarnDriverStdoutFilePath; + private final String yarnDriverStderrFilePath; private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters; private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters; @@ -82,6 +84,8 @@ final class YarnClusterSubmissionFromCS { this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString(); this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString(); this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString(); + this.yarnDriverStdoutFilePath = yarnClusterJobSubmissionParameters.getDriverStdoutFilePath().toString(); + this.yarnDriverStderrFilePath = yarnClusterJobSubmissionParameters.getDriverStderrFilePath().toString(); Validate.notEmpty(jobId, "The job id is null or empty"); Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0."); @@ -93,6 +97,8 @@ final class YarnClusterSubmissionFromCS { Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value"); Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value"); Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty"); + Validate.notEmpty(yarnDriverStdoutFilePath, "Driver stdout file path should not be empty"); + Validate.notEmpty(yarnDriverStderrFilePath, "Driver stderr file path should not be empty"); } @Override @@ -177,6 +183,14 @@ final class YarnClusterSubmissionFromCS { return driverRecoveryTimeout; } + String getYarnDriverStdoutFilePath() { + return yarnDriverStdoutFilePath; + } + + String getYarnDriverStderrFilePath() { + return yarnDriverStderrFilePath; + } + /** * @return The submission parameters for YARN applications. */ http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index d43ee8d..e76f025 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -148,6 +148,8 @@ public final class YarnJobSubmissionClient { .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0) .setLauncherClass(YarnBootstrapREEFLauncher.class) .setConfigurationFilePaths(confFiles) + .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath()) + .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath()) .submit(); writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index accdd0c..afa49be 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -64,7 +64,9 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "\"securityTokenKind\":\"" + NULL_REP + "\"," + "\"securityTokenService\":\"" + NULL_REP + "\"," + "\"maxApplicationSubmissions\":" + NUMBER_REP + "," + - "\"driverMemory\":" + NUMBER_REP + + "\"driverMemory\":" + NUMBER_REP + "," + + "\"driverStdoutFilePath\":" + STRING_REP_QUOTED + "," + + "\"driverStderrFilePath\":" + STRING_REP_QUOTED + "}"; private static final String AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING = @@ -92,6 +94,8 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert yarnClusterSubmissionFromCS.getMaxApplicationSubmissions() == NUMBER_REP; assert yarnClusterSubmissionFromCS.getTokenKind().equals(NULL_REP); assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP); + assert yarnClusterSubmissionFromCS.getYarnDriverStderrFilePath().equals(STRING_REP); + assert yarnClusterSubmissionFromCS.getYarnDriverStdoutFilePath().equals(STRING_REP); verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(), yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters()); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java index ecdc17f..3951c73 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java @@ -107,7 +107,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { this.configurationSerializer.toFile(driverConfiguration, new File(driverFolder, this.fileNames.getDriverConfigurationPath())); - this.driverLauncher.launch(driverFolder, t.getIdentifier(), t.getRemoteId()); + this.driverLauncher.launch(driverFolder); } catch (final Exception e) { LOG.log(Level.SEVERE, "Unable to setup driver.", e); throw new RuntimeException("Unable to setup driver.", e); http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java index e324cea..141e3aa 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java @@ -67,29 +67,33 @@ public class PreparedDriverFolderLauncher { this.commandPrefixList = commandPrefixList; } + + /** * Launches the driver prepared in driverFolder. * * @param driverFolder - * @param jobId - * @param clientRemoteId */ - public void launch(final File driverFolder, final String jobId, final String clientRemoteId) { + public void launch(final File driverFolder) { + launch(driverFolder, this.fileNames.getDriverStdoutFileName(), this.fileNames.getDriverStderrFileName()); + } + + public void launch(final File driverFolder, final String stdoutFilePath, final String stderrFilePath) { assert driverFolder.isDirectory(); - final List<String> command = makeLaunchCommand(jobId, clientRemoteId); + final List<String> command = makeLaunchCommand(); final RunnableProcess process = new RunnableProcess(command, "driver", driverFolder, new LoggingRunnableProcessObserver(), - this.fileNames.getDriverStdoutFileName(), - this.fileNames.getDriverStderrFileName()); + stdoutFilePath, + stderrFilePath); this.executor.submit(process); this.executor.shutdown(); } - private List<String> makeLaunchCommand(final String jobId, final String clientRemoteId) { + private List<String> makeLaunchCommand() { final List<String> command = new JavaLaunchCommandBuilder(commandPrefixList) .setConfigurationFilePaths(Collections.singletonList(this.fileNames.getDriverConfigurationPath())) http://git-wip-us.apache.org/repos/asf/reef/blob/1f965d79/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index 0847f4d..3eb92f4 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -54,6 +54,8 @@ public final class YarnSubmissionHelper implements Closeable{ private final ClasspathProvider classpath; private final SecurityTokenProvider tokenProvider; private final List<String> commandPrefixList; + private String driverStdoutFilePath; + private String driverStderrFilePath; private Class launcherClazz; private List<String> configurationFilePaths; @@ -65,6 +67,12 @@ public final class YarnSubmissionHelper implements Closeable{ this.fileNames = fileNames; this.classpath = classpath; + this.driverStdoutFilePath = + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName(); + + this.driverStderrFilePath = + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName(); + LOG.log(Level.FINE, "Initializing YARN Client"); this.yarnClient = YarnClient.createYarnClient(); this.yarnClient.init(yarnConfiguration); @@ -221,6 +229,26 @@ public final class YarnSubmissionHelper implements Closeable{ return this; } + /** + * Sets the Driver stdout file path. + * @param driverStdoutPath + * @return + */ + public YarnSubmissionHelper setDriverStdoutPath(final String driverStdoutPath) { + this.driverStdoutFilePath = driverStdoutPath; + return this; + } + + /** + * Sets the Driver stderr file path. + * @param driverStderrPath + * @return + */ + public YarnSubmissionHelper setDriverStderrPath(final String driverStderrPath) { + this.driverStderrFilePath = driverStderrPath; + return this; + } + public void submit() throws IOException, YarnException { // SET EXEC COMMAND @@ -228,8 +256,8 @@ public final class YarnSubmissionHelper implements Closeable{ .setConfigurationFilePaths(configurationFilePaths) .setClassPath(this.classpath.getDriverClasspath()) .setMemory(this.applicationSubmissionContext.getResource().getMemory()) - .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName()) - .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName()) + .setStandardOut(driverStdoutFilePath) + .setStandardErr(driverStderrFilePath) .build(); if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() &&
