Repository: reef Updated Branches: refs/heads/master 7f8793952 -> 47568d78c
[REEF-1799] Add FileSystemUrl parameter to REEF client and driver This parameter is required to run REEF jobs on Azure Data Lake. It is optional, therefore backward compatible. Summary of changes: * Update AVRO schema to add FileSystemUrl * Add Named parameter FileSystemUrl as both .Net and Java side * Make it available at both client and driver side * Update test cases JIRA: [REEF-1799](https://issues.apache.org/jira/browse/REEF-1799) This closes #1310 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/47568d78 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/47568d78 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/47568d78 Branch: refs/heads/master Commit: 47568d78c22a76bbbf85d0e3467e062ee9e46ac5 Parents: 7f87939 Author: Julia Wang <[email protected]> Authored: Thu May 18 19:52:14 2017 -0700 Committer: Sergiy Matusevych <[email protected]> Committed: Mon Jul 10 16:45:56 2017 -0700 ---------------------------------------------------------------------- .../YarnREEFParamSerializerTests.cs | 5 ++- .../AvroYarnClusterJobSubmissionParameters.cs | 2 +- .../YARN/AvroYarnJobSubmissionParameters.cs | 12 +++++-- .../Org.Apache.REEF.Client.csproj | 1 + .../YARN/Parameters/FileSystemUrl.cs | 27 ++++++++++++++ .../YARN/YARNClientConfiguration.cs | 9 +++++ .../YARN/YarnREEFDotNetParamSerializer.cs | 7 +++- .../YARN/YarnREEFParamSerializer.cs | 10 ++++-- .../src/main/avro/JobSubmissionParameters.avsc | 1 + .../YarnBootstrapDriverConfigGenerator.java | 3 ++ .../client/YarnClusterSubmissionFromCS.java | 11 ++++++ .../bridge/client/YarnJobSubmissionClient.java | 7 ++++ ...SubmissionParametersSerializationFromCS.java | 2 ++ .../yarn/driver/YarnContainerManager.java | 10 ++++-- .../yarn/driver/parameters/FileSystemUrl.java | 37 ++++++++++++++++++++ 15 files changed, 134 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs index 4cfc78c..23c8725 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs @@ -83,10 +83,11 @@ namespace Org.Apache.REEF.Client.Tests "\"jobSubmissionFolder\":\"{0}\"" + "}}," + "\"dfsJobSubmissionFolder\":\"{0}\"," + + "\"fileSystemUrl\":\"{1}\"," + "\"jobSubmissionDirectoryPrefix\":\"{0}\"" + "}}"; - var expectedJson = string.Format(formatString, AnyString); + var expectedJson = string.Format(formatString, AnyString, "NULL"); var injector = TangFactory.GetTang().NewInjector(); var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>(); @@ -151,6 +152,7 @@ namespace Org.Apache.REEF.Client.Tests "\"jobSubmissionFolder\":\"{0}\"" + "}}," + "\"dfsJobSubmissionFolder\":\"NULL\"," + + "\"fileSystemUrl\":\"{0}\"," + "\"jobSubmissionDirectoryPrefix\":\"{0}\"" + "}}," + "\"securityTokenKind\":\"{0}\"," + @@ -164,6 +166,7 @@ namespace Org.Apache.REEF.Client.Tests var conf = YARNClientConfiguration.ConfigurationModule .Set(YARNClientConfiguration.SecurityTokenKind, AnyString) .Set(YARNClientConfiguration.SecurityTokenService, AnyString) + .Set(YARNClientConfiguration.FileSystemUrl, AnyString) .Set(YARNClientConfiguration.JobSubmissionFolderPrefix, AnyString) .Build(); http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs index 6badc52..c4e82fb 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnClusterJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; /// <summary> /// Gets the schema. http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs index 9f03dac..1e599e5 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -60,6 +60,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN public string jobSubmissionDirectoryPrefix { get; set; } /// <summary> + /// Gets or sets file system url + /// </summary> + [DataMember] + public string fileSystemUrl { get; set; } + + /// <summary> /// Initializes a new instance of the <see cref="AvroYarnJobSubmissionParameters"/> class. /// </summary> public AvroYarnJobSubmissionParameters() @@ -73,10 +79,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param> /// <param name="dfsJobSubmissionFolder">The dfsJobSubmissionFolder.</param> /// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param> - public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string dfsJobSubmissionFolder, string jobSubmissionDirectoryPrefix) + /// <param name="fileSystemUrl">The file system URL prefix.</param> + public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string dfsJobSubmissionFolder, string fileSystemUrl, string jobSubmissionDirectoryPrefix) { this.sharedJobSubmissionParameters = sharedJobSubmissionParameters; this.dfsJobSubmissionFolder = dfsJobSubmissionFolder; + this.fileSystemUrl = fileSystemUrl; this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix; } } http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index 57c1ed4..c8df560 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -115,6 +115,7 @@ under the License. <Compile Include="YARN\Parameters\DriverMaxPermSizeMB.cs" /> <Compile Include="YARN\Parameters\DriverStderrFilePath.cs" /> <Compile Include="YARN\Parameters\DriverStdoutFilePath.cs" /> + <Compile Include="YARN\Parameters\FileSystemUrl.cs" /> <Compile Include="YARN\RestClient\HttpClient.cs" /> <Compile Include="YARN\RestClient\IDeserializer.cs" /> <Compile Include="YARN\RestClient\IHttpClient.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/FileSystemUrl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/FileSystemUrl.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/FileSystemUrl.cs new file mode 100644 index 0000000..8a4a06c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/Parameters/FileSystemUrl.cs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Client.YARN.Parameters +{ + [NamedParameter("FileSystem URL.", defaultValue: DefaultValue)] + public sealed class FileSystemUrl : Name<string> + { + public const string DefaultValue = "NULL"; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs index f4463bc..20127c6 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs @@ -35,12 +35,20 @@ namespace Org.Apache.REEF.Client.Yarn public static readonly OptionalParameter<string> SecurityTokenService = new OptionalParameter<string>(); public static readonly OptionalImpl<IYarnRestClientCredential> YarnRestClientCredential = new OptionalImpl<IYarnRestClientCredential>(); + /// <summary> + /// URL for store. For Hadoop file system, it is set in fs.defaultFS as default by YARN environment. Client doesn't need to + /// specify it. For Data Lake, Yarn applications are required to set the complete path by themselves + /// e.g. adl://reefadl.azuredatalakestore.net + /// </summary> + public static readonly OptionalParameter<string> FileSystemUrl = new OptionalParameter<string>(); + public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration() .BindImplementation(GenericType<IREEFClient>.Class, GenericType<YarnREEFClient>.Class) .BindImplementation(GenericType<IYarnRestClientCredential>.Class, YarnRestClientCredential) .BindNamedParameter(GenericType<JobSubmissionDirectoryPrefixParameter>.Class, JobSubmissionFolderPrefix) .BindNamedParameter(GenericType<SecurityTokenKindParameter>.Class, SecurityTokenKind) .BindNamedParameter(GenericType<SecurityTokenServiceParameter>.Class, SecurityTokenService) + .BindNamedParameter(GenericType<FileSystemUrl>.Class, FileSystemUrl) .Build(); [Unstable("This is temporary configuration until REEF-70 is completed when ConfigurationModule" + @@ -51,6 +59,7 @@ namespace Org.Apache.REEF.Client.Yarn .BindNamedParameter(GenericType<JobSubmissionDirectoryPrefixParameter>.Class, JobSubmissionFolderPrefix) .BindNamedParameter(GenericType<SecurityTokenKindParameter>.Class, SecurityTokenKind) .BindNamedParameter(GenericType<SecurityTokenServiceParameter>.Class, SecurityTokenService) + .BindNamedParameter(GenericType<FileSystemUrl>.Class, FileSystemUrl) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs index 5981c13..544b185 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs @@ -19,6 +19,7 @@ using System.IO; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Avro; using Org.Apache.REEF.Client.Avro.YARN; +using Org.Apache.REEF.Client.YARN.Parameters; using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Driver.Bridge; @@ -34,11 +35,14 @@ namespace Org.Apache.REEF.Client.YARN internal sealed class YarnREEFDotNetParamSerializer { private readonly REEFFileNames _fileNames; + private readonly string _fileSystemUrl; [Inject] - private YarnREEFDotNetParamSerializer(REEFFileNames fileNames) + private YarnREEFDotNetParamSerializer(REEFFileNames fileNames, + [Parameter(typeof(FileSystemUrl))] string fileSystemUrl) { _fileNames = fileNames; + _fileSystemUrl = fileSystemUrl; } /// <summary> @@ -104,6 +108,7 @@ namespace Org.Apache.REEF.Client.YARN { jobSubmissionDirectoryPrefix = jobSubmissionDirectory, dfsJobSubmissionFolder = jobSubmissionDirectory, + fileSystemUrl = _fileSystemUrl, sharedJobSubmissionParameters = avroJobSubmissionParameters }; http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs index a6a8584..bcaf6fd 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs @@ -38,6 +38,7 @@ namespace Org.Apache.REEF.Client.YARN private readonly REEFFileNames _fileNames; private readonly string _securityTokenKind; private readonly string _securityTokenService; + private readonly string _fileSystemUrl; private readonly string _jobSubmissionPrefix; [Inject] @@ -45,11 +46,13 @@ namespace Org.Apache.REEF.Client.YARN REEFFileNames fileNames, [Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind, [Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService, + [Parameter(typeof(FileSystemUrl))] string fileSystemUrl, [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix) { _fileNames = fileNames; _jobSubmissionPrefix = jobSubmissionPrefix; _securityTokenKind = securityTokenKind; + _fileSystemUrl = fileSystemUrl; _securityTokenService = securityTokenService; } @@ -113,15 +116,16 @@ namespace Org.Apache.REEF.Client.YARN var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters { - jobSubmissionDirectoryPrefix = _jobSubmissionPrefix, - sharedJobSubmissionParameters = avroJobSubmissionParameters + sharedJobSubmissionParameters = avroJobSubmissionParameters, + fileSystemUrl = _fileSystemUrl, + jobSubmissionDirectoryPrefix = _jobSubmissionPrefix }; var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters { + yarnJobSubmissionParameters = avroYarnJobSubmissionParameters, securityTokenKind = _securityTokenKind, securityTokenService = _securityTokenService, - yarnJobSubmissionParameters = avroYarnJobSubmissionParameters, driverMemory = jobParameters.DriverMemoryInMB, maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions, driverStdoutFilePath = string.IsNullOrWhiteSpace(jobParameters.StdoutFilePath.Value) ? http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc index 2f2a0c4..62bc757 100644 --- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc @@ -46,6 +46,7 @@ "fields": [ { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" }, { "name": "dfsJobSubmissionFolder", "type": "string", "default": "NULL" }, + { "name": "fileSystemUrl", "type": "string", "default": "NULL" }, { "name": "jobSubmissionDirectoryPrefix", "type": "string" } ] }, http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java index 303b8d8..9814e84 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java @@ -34,6 +34,7 @@ import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration; +import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; import org.apache.reef.tang.*; import org.apache.reef.tang.formats.ConfigurationSerializer; @@ -109,6 +110,8 @@ final class YarnBootstrapDriverConfigGenerator { .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount())) .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString()) + .bindNamedParameter(FileSystemUrl.class, + yarnJobSubmissionParams.getFileSystemUrl().toString()) .build(); final Configuration driverConfiguration = Configurations.merge( http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java index ccde624..71968b9 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java @@ -54,6 +54,7 @@ final class YarnClusterSubmissionFromCS { private final String tokenKind; private final String tokenService; private final String jobSubmissionDirectoryPrefix; + private final String fileSystemUrl; private final String yarnDriverStdoutFilePath; private final String yarnDriverStderrFilePath; @@ -83,6 +84,7 @@ final class YarnClusterSubmissionFromCS { this.queue = DEFAULT_QUEUE; this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString(); this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString(); + this.fileSystemUrl = yarnJobSubmissionParameters.getFileSystemUrl().toString(); this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString(); this.yarnDriverStdoutFilePath = yarnClusterJobSubmissionParameters.getDriverStdoutFilePath().toString(); this.yarnDriverStderrFilePath = yarnClusterJobSubmissionParameters.getDriverStderrFilePath().toString(); @@ -96,6 +98,7 @@ final class YarnClusterSubmissionFromCS { Validate.notEmpty(queue, "The queue is null or empty"); Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value"); Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value"); + Validate.notEmpty(fileSystemUrl, "File system Url should be either NULL or some custom non empty value"); Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty"); Validate.notEmpty(yarnDriverStdoutFilePath, "Driver stdout file path should not be empty"); Validate.notEmpty(yarnDriverStderrFilePath, "Driver stderr file path should not be empty"); @@ -116,6 +119,7 @@ final class YarnClusterSubmissionFromCS { ", queue='" + queue + '\'' + ", tokenKind='" + tokenKind + '\'' + ", tokenService='" + tokenService + '\'' + + ", fileSystemUrl='" + fileSystemUrl + '\'' + ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' + '}'; } @@ -170,6 +174,13 @@ final class YarnClusterSubmissionFromCS { } /** + * @return The file system url + */ + String getFileSystemUrl() { + return fileSystemUrl; + } + + /** * @return The max amount of times the application can be submitted. */ int getMaxApplicationSubmissions(){ http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index 6455f75..f536bd3 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -40,6 +40,7 @@ import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; +import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; import org.apache.reef.tang.Configuration; @@ -273,6 +274,12 @@ public final class YarnJobSubmissionClient { LOG.log(Level.FINE, "Did not find security token"); } + if (!yarnSubmission.getFileSystemUrl().equalsIgnoreCase(FileSystemUrl.DEFAULT_VALUE)) { + LOG.log(Level.INFO, "getFileSystemUrl: {0}", yarnSubmission.getFileSystemUrl()); + } else { + LOG.log(Level.INFO, "FileSystemUrl is not set"); + } + final List<String> launchCommandPrefix = new ArrayList<String>() {{ add(new REEFFileNames().getDriverLauncherExeFile().toString()); }}; http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index 490ed2e..b8024f9 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -54,6 +54,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "\"jobSubmissionFolder\":" + STRING_REP_QUOTED + "}," + "\"dfsJobSubmissionFolder\":\"" + STRING_REP + "\"," + + "\"fileSystemUrl\":\"" + STRING_REP + "\"," + "\"jobSubmissionDirectoryPrefix\":" + STRING_REP_QUOTED + "}"; @@ -345,6 +346,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP); assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP); assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getFileSystemUrl().toString().equals(STRING_REP); assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); } http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index 1b99613..f89afa0 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -43,6 +43,7 @@ import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; +import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.tang.InjectionFuture; @@ -89,6 +90,7 @@ final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMC private final String amRegistrationHost; private final String jobSubmissionDirectory; + private final String fileSystemUrl; private final REEFFileNames reefFileNames; private final RackNameFormatter rackNameFormatter; private final InjectionFuture<ProgressProvider> progressProvider; @@ -97,6 +99,7 @@ final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMC private YarnContainerManager( @Parameter(YarnHeartbeatPeriod.class) final int yarnRMHeartbeatPeriod, @Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory, + @Parameter(FileSystemUrl.class) final String fileSystemUrl, final YarnConfiguration yarnConf, final YarnProxyUser yarnProxyUser, final REEFEventHandlers reefEventHandlers, @@ -127,11 +130,14 @@ final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMC this.nodeManager = new NMClientAsyncImpl(this); this.jobSubmissionDirectory = jobSubmissionDirectory; + this.fileSystemUrl = fileSystemUrl; this.reefFileNames = reefFileNames; this.progressProvider = progressProvider; - LOG.log(Level.FINEST, "Instantiated YarnContainerManager: {0} {1}", - new Object[] {this.registration, this.yarnProxyUser}); + LOG.log(Level.INFO, "Instantiated YarnContainerManager: {0} {1}, trackingUrl: {2}, fileSystemUrl: {3}, " + + "jobSubmissionDirectory: {4}.", + new Object[] {this.registration, this.yarnProxyUser, this.trackingUrl, this.fileSystemUrl, + this.jobSubmissionDirectory}); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/47568d78/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/FileSystemUrl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/FileSystemUrl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/FileSystemUrl.java new file mode 100644 index 0000000..6b9efc9 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/FileSystemUrl.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The file system URL. + * For Hadoop file system, it is set in fs.defaultFS as default by YARN environment. + * For Data Lake, for example, Yarn applications are required to set the complete path by themselves. + * Example is adl://reefadl.azuredatalakestore.net. + */ +@NamedParameter(doc = "The File System URL.", default_value = "NULL") +public final class FileSystemUrl implements Name<String> { + + private FileSystemUrl() { + } + + public static final String DEFAULT_VALUE = "NULL"; +}
