Repository: reef Updated Branches: refs/heads/master e18b5d36a -> 987240cb7
[REEF-1891] Allow passing job submission environment variables from .Net client * Update AvroYarnClusterJobSubmissionParameters schema for environment map * Update Java bridge client to desterilize and set the map * Set the map to ContainerLaunchContext * Expose it in JobRequest and JobParameters in C# * Serialize it in YarnREEFParamSerializer. * Update unit test at both Java and C# * Updated HelloREEFYarn to set it. JIRA: [REEF-1891](https://issues.apache.org/jira/browse/REEF-1891) Pull request: This closes #1381 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/987240cb Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/987240cb Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/987240cb Branch: refs/heads/master Commit: 987240cb7c36befdda1b168c51231002de41c24d Parents: e18b5d3 Author: Julia Wang <[email protected]> Authored: Fri Sep 22 16:23:28 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Oct 18 13:25:01 2017 -0700 ---------------------------------------------------------------------- .../YarnREEFParamSerializerTests.cs | 9 +- .../Org.Apache.REEF.Client/API/JobParameters.cs | 15 +- .../API/JobParametersBuilder.cs | 16 ++ .../API/JobRequestBuilder.cs | 12 ++ .../AvroYarnClusterJobSubmissionParameters.cs | 13 +- .../Org.Apache.REEF.Client.csproj | 1 + .../Org.Apache.REEF.Client/YARN/Environment.cs | 156 +++++++++++++++++++ .../YARN/YarnREEFParamSerializer.cs | 3 + .../HelloREEFYarn.cs | 4 +- .../src/main/avro/JobSubmissionParameters.avsc | 1 + .../client/YarnClusterSubmissionFromCS.java | 26 ++++ .../bridge/client/YarnJobSubmissionClient.java | 1 + ...SubmissionParametersSerializationFromCS.java | 6 +- .../yarn/client/YarnSubmissionHelper.java | 26 +++- .../reef/runtime/yarn/util/YarnTypes.java | 34 +++- 15 files changed, 311 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs index 23c8725..040a9f8 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs @@ -159,6 +159,7 @@ namespace Org.Apache.REEF.Client.Tests "\"securityTokenService\":\"{0}\"," + "\"maxApplicationSubmissions\":{1}," + "\"driverMemory\":{1}," + + "\"envMap\":{{\"key1\":\"{0}\",\"key2\":\"{0}\"}}," + "\"driverStdoutFilePath\":\"{0}\"," + "\"driverStderrFilePath\":\"{0}\"" + "}}"; @@ -178,15 +179,17 @@ namespace Org.Apache.REEF.Client.Tests .SetJobIdentifier(AnyString) .SetMaxApplicationSubmissions(AnyInt) .SetDriverMemory(AnyInt) + .SetJobSubmissionEnvironmentVariable("key1", AnyString) + .SetJobSubmissionEnvironmentVariable("key2", AnyString) .SetDriverStderrFilePath(AnyString) .SetDriverStdoutFilePath(AnyString) .Build(); var serializedBytes = serializer.SerializeJobArgsToBytes(jobRequest.JobParameters, AnyString); - var expectedString = Encoding.UTF8.GetString(serializedBytes); - var jsonObject = JObject.Parse(expectedString); + var actualString = Encoding.UTF8.GetString(serializedBytes); + var actualJsonObject = JObject.Parse(actualString); var expectedJsonObject = JObject.Parse(expectedJson); - Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); + Assert.True(JToken.DeepEquals(actualJsonObject, expectedJsonObject)); } private sealed class DriverStartHandler : IObserver<IDriverStarted> http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs index 6d6c307..5279e3a 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System; +using System.Collections.Generic; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -30,6 +32,7 @@ namespace Org.Apache.REEF.Client.API private readonly string _jobIdentifier; private readonly int _maxApplicationSubmissions; private readonly int _driverMemory; + private IDictionary<string, string> _jobSubmissionEnvMap; private readonly Optional<string> _stdoutFilePath; private readonly Optional<string> _stderrFilePath; private readonly JavaLoggingSetting _logSetting; @@ -38,6 +41,7 @@ namespace Org.Apache.REEF.Client.API string jobIdentifier, int maxApplicationSubmissions, int driverMemory, + IDictionary<string, string> jobSubmissionEnvMap, string stdoutFilePath, string stderrFilePath, JavaLoggingSetting logSetting) @@ -45,7 +49,8 @@ namespace Org.Apache.REEF.Client.API _jobIdentifier = jobIdentifier; _maxApplicationSubmissions = maxApplicationSubmissions; _driverMemory = driverMemory; - + _jobSubmissionEnvMap = jobSubmissionEnvMap; + _stdoutFilePath = string.IsNullOrWhiteSpace(stdoutFilePath) ? Optional<string>.Empty() : Optional<string>.Of(stdoutFilePath); @@ -81,6 +86,14 @@ namespace Org.Apache.REEF.Client.API } /// <summary> + /// The job submission environment variable map. + /// </summary> + public IDictionary<string, string> JobSubmissionEnvMap + { + get { return new Dictionary<string, string>(_jobSubmissionEnvMap); } + } + + /// <summary> /// Gets the file path for stdout for the driver. /// </summary> public Optional<string> StdoutFilePath http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs index 06ab7ee..39df14e 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System.Collections.Generic; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Client.API @@ -29,6 +30,7 @@ namespace Org.Apache.REEF.Client.API private int _driverMemory = 512; private string _stdoutFilePath = null; private string _stderrFilePath = null; + private readonly IDictionary<string, string> _jobSubmissionMap = new Dictionary<string, string>(); private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.Info; private JobParametersBuilder() @@ -53,6 +55,7 @@ namespace Org.Apache.REEF.Client.API _jobIdentifier, _maxApplicationSubmissions, _driverMemory, + _jobSubmissionMap, _stdoutFilePath, _stderrFilePath, _javaLogLevel); @@ -89,6 +92,19 @@ namespace Org.Apache.REEF.Client.API } /// <summary> + /// Set job submission environment variable. + /// If the variable is already in the map, override it. + /// </summary> + /// <param name="key"></param> + /// <param name="value"></param> + /// <returns></returns> + public JobParametersBuilder SetJobSubmissionEnvironmentVariable(string key, string value) + { + _jobSubmissionMap[key] = value; + return this; + } + + /// <summary> /// Sets the file path to the stdout file for the driver. /// </summary> public JobParametersBuilder SetDriverStdoutFilePath(string stdoutFilePath) http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs index 325ed04..234cd7a 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs @@ -148,6 +148,18 @@ namespace Org.Apache.REEF.Client.API } /// <summary> + /// Set a job submission environment variable. + /// </summary> + /// <param name="key">key of the environment variable.</param> + /// <param name="value">Value of the environment variable.</param> + /// <returns></returns> + public JobRequestBuilder SetJobSubmissionEnvironmentVariable(string key, string value) + { + _jobParametersBuilder.SetJobSubmissionEnvironmentVariable(key, value); + return this; + } + + /// <summary> /// Sets the maximum amount of times a job can be submitted. /// </summary> public JobRequestBuilder SetMaxApplicationSubmissions(int maxAppSubmissions) http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs index c4e82fb..9bf2a64 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System.Collections.Generic; using System.Runtime.Serialization; using Org.Apache.REEF.Utilities.Attributes; @@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnClusterJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""envMap"",""type"":{""type"":""map"",""values"":""string""}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}"; /// <summary> /// Gets the schema. @@ -66,6 +67,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN public int driverMemory { get; set; } /// <summary> + /// Gets or sets the envMap field. + /// </summary> + [DataMember] + public IDictionary<string, string> envMap { get; set; } + + /// <summary> /// Gets or sets the maxApplicationSubmissions field. /// </summary> [DataMember] @@ -99,15 +106,17 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// <param name="securityTokenKind">The securityTokenKind.</param> /// <param name="securityTokenService">The securityTokenService.</param> /// <param name="driverMemory">The driverMemory.</param> + /// <param name="envMap">The envMap.</param> /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> /// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param> /// <param name="driverStderrFilePath">The driverStderrFilePath.</param> - public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath) + public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, IDictionary<string, string> envMap, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath) { this.yarnJobSubmissionParameters = yarnJobSubmissionParameters; this.securityTokenKind = securityTokenKind; this.securityTokenService = securityTokenService; this.driverMemory = driverMemory; + this.envMap = envMap; this.maxApplicationSubmissions = maxApplicationSubmissions; this.driverStdoutFilePath = driverStdoutFilePath; this.driverStderrFilePath = driverStderrFilePath; http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index 7748895..a0b4f75 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -100,6 +100,7 @@ under the License. <Compile Include="Local\Parameters\NumberOfEvaluators.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="YARN\ApplicationReport.cs" /> + <Compile Include="YARN\Environment.cs" /> <Compile Include="YARN\HDI\HDInsightClientConfiguration.cs" /> <Compile Include="YARN\HDI\HDInsightCommandLineEnvironment.cs" /> <Compile Include="YARN\HDI\HDInsightCredential.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs new file mode 100644 index 0000000..4704f2b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Org.Apache.REEF.Client.YARN +{ + /// <summary> + /// Default environment map keys from YARN. + /// <a href="http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html"> + /// Hadoop RM REST API</a> documentation. + /// </summary> + public enum Environment + { + /** + * USER + */ + USER, + + /** + * LOGNAME + */ + LOGNAME, + + /** + * HOME + */ + HOME, + + /** + * PWD + */ + PWD, + + /** + * PATH + */ + PATH, + + /** + * SHELL + */ + SHELL, + + /** + * JAVA_HOME + */ + JAVA_HOME, + + /** + * CLASSPATH + */ + CLASSPATH, + + /** + * APP_CLASSPATH + */ + APP_CLASSPATH, + + /** + * HADOOP_CLASSPATH. + */ + HADOOP_CLASSPATH, + + /** + * LD_LIBRARY_PATH + */ + LD_LIBRARY_PATH, + + /** + * HADOOP_CONF_DIR + */ + HADOOP_CONF_DIR, + + /** + * HADOOP_CLIENT_CONF_DIR Final, non-modifiable. + */ + HADOOP_CLIENT_CONF_DIR, + + /** + * $HADOOP_COMMON_HOME + */ + HADOOP_COMMON_HOME, + + /** + * $HADOOP_HDFS_HOME + */ + HADOOP_HDFS_HOME, + + /** + * $MALLOC_ARENA_MAX + */ + MALLOC_ARENA_MAX, + + /** + * $HADOOP_YARN_HOME + */ + HADOOP_YARN_HOME, + + /** + * $CLASSPATH_PREPEND_DISTCACHE + * Private, Windows specific + */ + CLASSPATH_PREPEND_DISTCACHE, + + /** + * $CONTAINER_ID + * Exported by NodeManager and non-modifiable by users. + */ + CONTAINER_ID, + + /** + * $NM_HOST + * Exported by NodeManager and non-modifiable by users. + */ + NM_HOST, + + /** + * $NM_HTTP_PORT + * Exported by NodeManager and non-modifiable by users. + */ + NM_HTTP_PORT, + + /** + * $NM_PORT + * Exported by NodeManager and non-modifiable by users. + */ + NM_PORT, + + /** + * $LOCAL_DIRS + * Exported by NodeManager and non-modifiable by users. + */ + LOCAL_DIRS, + + /** + * $LOG_DIRS + * Exported by NodeManager and non-modifiable by users. + * Comma separate list of directories that the container should use for + * logging. + */ + LOG_DIRS + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs index c09eaad..8fd6a4f 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System.Collections.Generic; +using System.Globalization; using System.IO; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Avro; @@ -166,6 +168,7 @@ namespace Org.Apache.REEF.Client.YARN yarnJobSubmissionParameters = avroYarnJobSubmissionParameters, driverMemory = jobParameters.DriverMemoryInMB, + envMap = jobParameters.JobSubmissionEnvMap, maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions, driverStdoutFilePath = string.IsNullOrWhiteSpace(jobParameters.StdoutFilePath.Value) ? _fileNames.GetDefaultYarnDriverStdoutFilePath() : jobParameters.StdoutFilePath.Value, http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs index 917e3eb..2285a50 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs @@ -94,6 +94,8 @@ namespace Org.Apache.REEF.Examples.HelloREEF .AddDriverConfiguration(driverConfig.Build()) .AddGlobalAssemblyForType(typeof(HelloDriverYarn)) .SetJobIdentifier("HelloREEF") + .SetJobSubmissionEnvironmentVariable(Environment.PATH.ToString(), "value1") + .SetJobSubmissionEnvironmentVariable("UserDefineKey", "value2") .SetJavaLogLevel(JavaLoggingSetting.Verbose) .Build(); @@ -180,7 +182,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF /// <summary> /// HelloREEF example running on YARN - /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]... + /// Usage: Org.Apache.REEF.Examples.HelloREEF TrustedApplicaitonLLQ SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]... /// </summary> /// <param name="args"></param> public static void MainYarn(string[] args) http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc index 62bc757..ecf0043 100644 --- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc @@ -60,6 +60,7 @@ { "name": "securityTokenKind", "type": "string", "default": "NULL" }, { "name": "securityTokenService", "type": "string", "default": "NULL" }, { "name": "driverMemory", "type": "int" }, + {"name": "environmentVariablesMap", "type": {"type": "map", "values": "string"}}, { "name": "maxApplicationSubmissions", "type": "int" }, { "name": "driverStdoutFilePath", "type": "string" }, { "name": "driverStderrFilePath", "type": "string" } http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java index 71968b9..b45a2f2 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java @@ -28,6 +28,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; /** * Represents a job submission from the CS code. @@ -57,6 +59,7 @@ final class YarnClusterSubmissionFromCS { private final String fileSystemUrl; private final String yarnDriverStdoutFilePath; private final String yarnDriverStderrFilePath; + private final Map<String, String> environmentVariablesMap = new HashMap<>(); private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters; private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters; @@ -89,6 +92,13 @@ final class YarnClusterSubmissionFromCS { this.yarnDriverStdoutFilePath = yarnClusterJobSubmissionParameters.getDriverStdoutFilePath().toString(); this.yarnDriverStderrFilePath = yarnClusterJobSubmissionParameters.getDriverStderrFilePath().toString(); + if (yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap() != null) { + for (Map.Entry<java.lang.CharSequence, java.lang.CharSequence> pair : + yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap().entrySet()) { + this.environmentVariablesMap.put(pair.getKey().toString(), pair.getValue().toString()); + } + } + Validate.notEmpty(jobId, "The job id is null or empty"); Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0."); Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0."); @@ -121,9 +131,18 @@ final class YarnClusterSubmissionFromCS { ", tokenService='" + tokenService + '\'' + ", fileSystemUrl='" + fileSystemUrl + '\'' + ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' + + envMapString() + '}'; } + private String envMapString() { + final StringBuilder sb = new StringBuilder(); + for (final Map.Entry<String, String> entry : environmentVariablesMap.entrySet()) { + sb.append(", Key:" + entry.getKey() + ", value:" + entry.getValue()); + } + return sb.toString(); + } + /** * @return The local folder where the driver is staged. */ @@ -181,6 +200,13 @@ final class YarnClusterSubmissionFromCS { } /** + * @return The environment map. + */ + Map<String, String> getEnvironmentVariablesMap() { + return new HashMap<>(environmentVariablesMap); + } + + /** * @return The max amount of times the application can be submitted. */ int getMaxApplicationSubmissions(){ http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index 0d71517..868201b 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -171,6 +171,7 @@ public final class YarnJobSubmissionClient { .setConfigurationFilePaths(confFiles) .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath()) .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath()) + .setJobSubmissionEnvMap(yarnSubmission.getEnvironmentVariablesMap()) .submit(); writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index b8024f9..f733131 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -65,6 +65,10 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "\"securityTokenService\":\"" + NULL_REP + "\"," + "\"maxApplicationSubmissions\":" + NUMBER_REP + "," + "\"driverMemory\":" + NUMBER_REP + "," + + "\"environmentVariablesMap\":" + + "{" + + "\"key\":" + STRING_REP_QUOTED + + "}," + "\"driverStdoutFilePath\":" + STRING_REP_QUOTED + "," + "\"driverStderrFilePath\":" + STRING_REP_QUOTED + "}"; @@ -179,7 +183,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP); assert yarnClusterSubmissionFromCS.getYarnDriverStderrFilePath().equals(STRING_REP); assert yarnClusterSubmissionFromCS.getYarnDriverStdoutFilePath().equals(STRING_REP); - + assert yarnClusterSubmissionFromCS.getEnvironmentVariablesMap().get("key").equals(STRING_REP); verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(), yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters()); } http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index 5df89e7..7eec786 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -63,6 +63,7 @@ public final class YarnSubmissionHelper implements AutoCloseable { private String driverStderrFilePath; private Class launcherClazz = REEFLauncher.class; private List<String> configurationFilePaths; + private final Map<String, String> environmentVariablesMap = new HashMap<>(); public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, @@ -241,6 +242,29 @@ public final class YarnSubmissionHelper implements AutoCloseable { } /** + * Sets environment variable map. + * @param map + * @return + */ + public YarnSubmissionHelper setJobSubmissionEnvMap(final Map<String, String> map) { + for (final Map.Entry<String, String> entry : map.entrySet()) { + environmentVariablesMap.put(entry.getKey(), entry.getValue()); + } + return this; + } + + /** + * Adds a job submission environment variable. + * @param key + * @param value + * @return + */ + public YarnSubmissionHelper setJobSubmissionEnvVariable(final String key, final String value) { + environmentVariablesMap.put(key, value); + return this; + } + + /** * Sets the Driver stdout file path. * @param driverStdoutPath * @return @@ -278,7 +302,7 @@ public final class YarnSubmissionHelper implements AutoCloseable { } final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext( - launchCommand, this.resources, tokenProvider.getTokens()); + launchCommand, this.resources, tokenProvider.getTokens(), environmentVariablesMap); this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}", http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java index 88b7134..cc22d40 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java @@ -51,27 +51,55 @@ public final class YarnTypes { final List<String> commands, final Map<String, LocalResource> localResources, final byte[] securityTokenBuffer) { - return getContainerLaunchContext(commands, localResources, securityTokenBuffer, null); + return getContainerLaunchContext(commands, localResources, securityTokenBuffer, + new HashMap<String, String>(), null); + } + + /** + * @return a ContainerLaunchContext with the given commands, LocalResources and environment map. + */ + public static ContainerLaunchContext getContainerLaunchContext( + final List<String> commands, + final Map<String, LocalResource> localResources, + final byte[] securityTokenBuffer, + final Map<String, String> envMap) { + return getContainerLaunchContext(commands, localResources, securityTokenBuffer, envMap, null); + } + + /** + * Gets a LaunchContext and sets the environment variable. + * @return a ContainerLaunchContext with the given commands and LocalResources. + */ + public static ContainerLaunchContext getContainerLaunchContext( + final List<String> commands, + final Map<String, LocalResource> localResources, + final byte[] securityTokenBuffer, + final ApplicationId applicationId) { + return getContainerLaunchContext(commands, localResources, securityTokenBuffer, + new HashMap<String, String>(), null); } /** * Gets a LaunchContext and sets the environment variable * {@link YarnUtilities#REEF_YARN_APPLICATION_ID_ENV_VAR} for REEF Evaluators. - * @return a ContainerLaunchContext with the given commands and LocalResources. + * @return a ContainerLaunchContext with the given commands, LocalResources and environment map. */ public static ContainerLaunchContext getContainerLaunchContext( final List<String> commands, final Map<String, LocalResource> localResources, final byte[] securityTokenBuffer, + final Map<String, String> envMap, final ApplicationId applicationId) { final ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class); context.setLocalResources(localResources); context.setCommands(commands); - final Map<String, String> envMap = new HashMap<>(); if (applicationId != null) { envMap.put(YarnUtilities.REEF_YARN_APPLICATION_ID_ENV_VAR, applicationId.toString()); } + for (final Map.Entry entry : envMap.entrySet()) { + LOG.log(Level.FINE, "Key : {0}, Value : {1}", new Object[] {entry.getKey(), entry.getValue()}); + } context.setEnvironment(envMap); if (securityTokenBuffer != null) { context.setTokens(ByteBuffer.wrap(securityTokenBuffer));
