Repository: reef Updated Branches: refs/heads/master 61e0424d1 -> f089f88bb
[REEF-1205] Move submission-time parameters into AvroYarnJobSubmissionParameters This addressed the issue by * Removing DriverConfigurationOptions.MaxApplicationSubmissions and moved it into the IJobSubmission object. * Move parameters from AppSubmissionParameters to JobSubmissionParameters. * Modified tests and examples. JIRA: [REEF-1205](https://issues.apache.org/jira/browse/REEF-1205) Pull Request: Closes #840 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f089f88b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f089f88b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f089f88b Branch: refs/heads/master Commit: f089f88bbaca604db3724f46159be8a9dd643b81 Parents: 61e0424 Author: Andrew Chung <[email protected]> Authored: Thu Feb 11 13:47:40 2016 -0800 Committer: Anupam <[email protected]> Committed: Tue Feb 16 15:31:05 2016 -0800 ---------------------------------------------------------------------- .../YarnREEFParamSerializerTests.cs | 46 +++++++----- .../API/IJobSubmission.cs | 5 ++ .../API/IJobSubmissionBuilder.cs | 7 +- .../Org.Apache.REEF.Client/API/JobSubmission.cs | 10 ++- .../API/JobSubmissionBuilder.cs | 12 +++- .../YARN/AvroClusterAppSubmissionParameters.cs | 73 -------------------- .../YARN/AvroYarnAppSubmissionParameters.cs | 12 +--- .../AvroYarnClusterJobSubmissionParameters.cs | 20 +++++- .../Org.Apache.REEF.Client.csproj | 14 ++-- .../YARN/YARNREEFClient.cs | 4 +- .../YARN/YarnREEFDotNetClient.cs | 4 +- .../YARN/YarnREEFDotNetParamSerializer.cs | 3 +- .../YARN/YarnREEFParamSerializer.cs | 23 +++--- .../Bridge/DriverBridgeConfigurationOptions.cs | 1 + .../DriverConfiguration.cs | 1 + .../DriverRestart.cs | 2 +- .../src/main/avro/AppSubmissionParameters.avsc | 11 --- .../src/main/avro/JobSubmissionParameters.avsc | 4 +- .../client/YarnClusterSubmissionFromCS.java | 20 +++--- ...SubmissionParametersSerializationFromCS.java | 13 ++-- 20 files changed, 119 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs index 4fd1264..3b0b8fa 100644 --- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs +++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs @@ -43,7 +43,6 @@ namespace Org.Apache.REEF.Client.Tests "\"tcpRangeCount\":{0}," + "\"tcpTryCount\":{0}" + "}}," + - "\"driverMemory\":{0}," + "\"driverRecoveryTimeout\":{0}" + "}}"; @@ -64,10 +63,11 @@ namespace Org.Apache.REEF.Client.Tests var serializer = injector.GetInstance<YarnREEFDotNetParamSerializer>(); var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() - .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build(); + .GetJobSubmissionBuilder().Build(); var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector, AnyString); - var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedString = Encoding.UTF8.GetString(serializedBytes); + var jsonObject = JObject.Parse(expectedString); var expectedJsonObject = JObject.Parse(expectedJson); Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); } @@ -94,7 +94,8 @@ namespace Org.Apache.REEF.Client.Tests .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build(); var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString, AnyString); - var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedString = Encoding.UTF8.GetString(serializedBytes); + var jsonObject = JObject.Parse(expectedString); var expectedJsonObject = JObject.Parse(expectedJson); Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); } @@ -103,16 +104,12 @@ namespace Org.Apache.REEF.Client.Tests public void TestYarnREEFAppSerialization() { const string formatString = "{{" + - "\"yarnAppSubmissionParameters\":" + - "{{\"sharedAppSubmissionParameters\":" + + "\"sharedAppSubmissionParameters\":" + "{{\"tcpBeginPort\":{0}," + "\"tcpRangeCount\":{0}," + "\"tcpTryCount\":{0}" + "}}," + - "\"driverMemory\":{0}," + "\"driverRecoveryTimeout\":{0}" + - "}}," + - "\"maxApplicationSubmissions\":{0}" + "}}"; var expectedJson = string.Format(formatString, AnyInt); @@ -125,16 +122,20 @@ namespace Org.Apache.REEF.Client.Tests var driverConf = DriverConfiguration.ConfigurationModule .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverStartHandler>.Class) .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, AnyInt.ToString()) - .Set(DriverConfiguration.MaxApplicationSubmissions, AnyInt.ToString()).Build(); + .Build(); var injector = TangFactory.GetTang().NewInjector(tcpConf, driverConf); var serializer = injector.GetInstance<YarnREEFParamSerializer>(); var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() - .GetJobSubmissionBuilder().SetDriverMemory(AnyInt).Build(); + .GetJobSubmissionBuilder() + .SetDriverMemory(AnyInt) + .SetMaxApplicationSubmissions(AnyInt) + .Build(); var serializedBytes = serializer.SerializeAppArgsToBytes(jobSubmission, injector); - var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var expectedString = Encoding.UTF8.GetString(serializedBytes); + var jsonObject = JObject.Parse(expectedString); var expectedJsonObject = JObject.Parse(expectedJson); Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); } @@ -150,10 +151,14 @@ namespace Org.Apache.REEF.Client.Tests "{{" + "\"jobId\":\"{0}\"," + "\"jobSubmissionFolder\":\"{0}\"" + - "}},\"dfsJobSubmissionFolder\":\"NULL\"," + + "}}," + + "\"dfsJobSubmissionFolder\":\"NULL\"," + "\"jobSubmissionDirectoryPrefix\":\"{0}\"" + "}}," + - "\"securityTokenKind\":\"{0}\",\"securityTokenService\":\"{0}\"" + + "\"securityTokenKind\":\"{0}\"," + + "\"securityTokenService\":\"{0}\"," + + "\"maxApplicationSubmissions\":{1}," + + "\"driverMemory\":{1}" + "}}"; var conf = YARNClientConfiguration.ConfigurationModule @@ -162,15 +167,20 @@ namespace Org.Apache.REEF.Client.Tests .Set(YARNClientConfiguration.JobSubmissionFolderPrefix, AnyString) .Build(); - var expectedJson = string.Format(formatString, AnyString); + var expectedJson = string.Format(formatString, AnyString, AnyInt); var injector = TangFactory.GetTang().NewInjector(conf); var serializer = injector.GetInstance<YarnREEFParamSerializer>(); var jobSubmission = injector.GetInstance<JobSubmissionBuilderFactory>() - .GetJobSubmissionBuilder().SetJobIdentifier(AnyString).Build(); + .GetJobSubmissionBuilder() + .SetJobIdentifier(AnyString) + .SetMaxApplicationSubmissions(AnyInt) + .SetDriverMemory(AnyInt) + .Build(); - var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, AnyString); - var jsonObject = JObject.Parse(Encoding.UTF8.GetString(serializedBytes)); + var serializedBytes = serializer.SerializeJobArgsToBytes(jobSubmission, injector, AnyString); + var expectedString = Encoding.UTF8.GetString(serializedBytes); + var jsonObject = JObject.Parse(expectedString); var expectedJsonObject = JObject.Parse(expectedJson); Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject)); } http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmission.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/IJobSubmission.cs b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmission.cs index f45f655..dac9023 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/IJobSubmission.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmission.cs @@ -57,6 +57,11 @@ namespace Org.Apache.REEF.Client.API int DriverMemory { get; } /// <summary> + /// The maximum amount of times an application can be submitted. + /// </summary> + int MaxApplicationSubmissions { get; } + + /// <summary> /// The Job's identifier /// </summary> string JobIdentifier { get; } http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmissionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/IJobSubmissionBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmissionBuilder.cs index a9e430c..4a35b41 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/IJobSubmissionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/IJobSubmissionBuilder.cs @@ -71,11 +71,16 @@ namespace Org.Apache.REEF.Client.API IJobSubmissionBuilder SetJobIdentifier(string id); /// <summary> - /// Set driver memory in megabytes + /// Set driver memory in megabytes. /// </summary> IJobSubmissionBuilder SetDriverMemory(int driverMemoryInMb); /// <summary> + /// Set the maximum amount of times a job can be submitted. + /// </summary> + IJobSubmissionBuilder SetMaxApplicationSubmissions(int maxAppSubmissions); + + /// <summary> /// Driver config file contents (Org.Apache.REEF.Bridge.exe.config) contents /// Can be used to redirect assembly versions /// </summary> http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/API/JobSubmission.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobSubmission.cs b/lang/cs/Org.Apache.REEF.Client/API/JobSubmission.cs index cb51203..a37d18a 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobSubmission.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobSubmission.cs @@ -31,6 +31,7 @@ namespace Org.Apache.REEF.Client.API private readonly ISet<string> _localAssemblies; private readonly ISet<string> _localFiles; private readonly int _driverMemory; + private readonly int _maxAppSubmissions; private readonly string _jobIdentifier; private readonly string _driverConfigurationFileContents; @@ -42,7 +43,8 @@ namespace Org.Apache.REEF.Client.API ISet<string> localFiles, int driverMemory, string jobIdentifier, - string driverConfigurationFileContents) + string driverConfigurationFileContents, + int maxAppSubmissions) { _driverConfigurations = driverConfigurations; _globalAssemblies = globalAssemblies; @@ -52,6 +54,7 @@ namespace Org.Apache.REEF.Client.API _driverMemory = driverMemory; _jobIdentifier = jobIdentifier; _driverConfigurationFileContents = driverConfigurationFileContents; + _maxAppSubmissions = maxAppSubmissions; } /// <summary> @@ -90,6 +93,11 @@ namespace Org.Apache.REEF.Client.API get { return _driverMemory; } } + int IJobSubmission.MaxApplicationSubmissions + { + get { return _maxAppSubmissions; } + } + /// <summary> /// The Job's identifier /// </summary> http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/API/JobSubmissionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobSubmissionBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobSubmissionBuilder.cs index bfd459d..eab3fab 100644 --- a/lang/cs/Org.Apache.REEF.Client/API/JobSubmissionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Client/API/JobSubmissionBuilder.cs @@ -30,6 +30,7 @@ namespace Org.Apache.REEF.Client.API private readonly ISet<string> _localAssemblies = new HashSet<string>(); private readonly ISet<string> _localFiles = new HashSet<string>(); private int _driverMemory = 512; + private int _maxAppSubmissions = 1; private string _jobIdentifier; private readonly ISet<IConfigurationProvider> _configurationProviders; private string _driverConfigurationFileContents; @@ -139,6 +140,15 @@ namespace Org.Apache.REEF.Client.API } /// <summary> + /// Sets the maximum amount of times a job can be submitted. + /// </summary> + public IJobSubmissionBuilder SetMaxApplicationSubmissions(int maxAppSubmissions) + { + _maxAppSubmissions = maxAppSubmissions; + return this; + } + + /// <summary> /// Driver config file contents (Org.Apache.REEF.Bridge.exe.config) contents /// Can be use to redirect assembly versions /// </summary> @@ -173,7 +183,7 @@ namespace Org.Apache.REEF.Client.API } return new JobSubmission(_driverConfigurations, _globalAssemblies, _globalFiles, _localAssemblies, - _localFiles, _driverMemory, _jobIdentifier, _driverConfigurationFileContents); + _localFiles, _driverMemory, _jobIdentifier, _driverConfigurationFileContents, _maxAppSubmissions); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs deleted file mode 100644 index a3d0866..0000000 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroClusterAppSubmissionParameters.cs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System.Runtime.Serialization; -using Org.Apache.REEF.Utilities.Attributes; - -namespace Org.Apache.REEF.Client.Avro.YARN -{ - /// <summary> - /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters. - /// </summary> - [Private] - [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] - public sealed class AvroYarnClusterAppSubmissionParameters - { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterAppSubmissionParameters"",""doc"":""Cross-language application submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""dr iverRecoveryTimeout"",""type"":""int""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}"; - - /// <summary> - /// Gets the schema. - /// </summary> - public static string Schema - { - get - { - return JsonSchema; - } - } - - /// <summary> - /// Gets or sets the yarnAppSubmissionParameters field. - /// </summary> - [DataMember] - public AvroYarnAppSubmissionParameters yarnAppSubmissionParameters { get; set; } - - /// <summary> - /// Gets or sets the maxApplicationSubmissions field. - /// </summary> - [DataMember] - public int maxApplicationSubmissions { get; set; } - - /// <summary> - /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class. - /// </summary> - public AvroYarnClusterAppSubmissionParameters() - { - } - - /// <summary> - /// Initializes a new instance of the <see cref="AvroYarnClusterAppSubmissionParameters"/> class. - /// </summary> - /// <param name="yarnAppSubmissionParameters">The yarnAppSubmissionParameters.</param> - /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> - public AvroYarnClusterAppSubmissionParameters(AvroYarnAppSubmissionParameters yarnAppSubmissionParameters, int maxApplicationSubmissions) - { - this.yarnAppSubmissionParameters = yarnAppSubmissionParameters; - this.maxApplicationSubmissions = maxApplicationSubmissions; - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs index 0612800..e52d63a 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnAppSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnAppSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters"",""doc"":""General cross-language application submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedAppSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters"",""doc"":""General cross-language application submission parameters shared by all runtimes"",""fields"":[{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""}]}},{""name"":""driverRecoveryTimeout"",""type"":""int""}]}"; /// <summary> /// Gets the schema. @@ -48,12 +48,6 @@ namespace Org.Apache.REEF.Client.Avro.YARN public AvroAppSubmissionParameters sharedAppSubmissionParameters { get; set; } /// <summary> - /// Gets or sets the driverMemory field. - /// </summary> - [DataMember] - public int driverMemory { get; set; } - - /// <summary> /// Gets or sets the driverRecoveryTimeout field. /// </summary> [DataMember] @@ -70,12 +64,10 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// Initializes a new instance of the <see cref="AvroYarnAppSubmissionParameters"/> class. /// </summary> /// <param name="sharedAppSubmissionParameters">The sharedAppSubmissionParameters.</param> - /// <param name="driverMemory">The driverMemory.</param> /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param> - public AvroYarnAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int driverMemory, int driverRecoveryTimeout) + public AvroYarnAppSubmissionParameters(AvroAppSubmissionParameters sharedAppSubmissionParameters, int driverRecoveryTimeout) { this.sharedAppSubmissionParameters = sharedAppSubmissionParameters; - this.driverMemory = driverMemory; this.driverRecoveryTimeout = driverRecoveryTimeout; } } http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs index 159c8cf..2bac1e4 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs +++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")] public sealed class AvroYarnClusterJobSubmissionParameters { - private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}"; + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""na me"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}"; /// <summary> /// Gets the schema. @@ -60,6 +60,18 @@ namespace Org.Apache.REEF.Client.Avro.YARN public string securityTokenService { get; set; } /// <summary> + /// Gets or sets the driverMemory field. + /// </summary> + [DataMember] + public int driverMemory { get; set; } + + /// <summary> + /// Gets or sets the maxApplicationSubmissions field. + /// </summary> + [DataMember] + public int maxApplicationSubmissions { get; set; } + + /// <summary> /// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class. /// </summary> public AvroYarnClusterJobSubmissionParameters() @@ -74,11 +86,15 @@ namespace Org.Apache.REEF.Client.Avro.YARN /// <param name="yarnJobSubmissionParameters">The yarnJobSubmissionParameters.</param> /// <param name="securityTokenKind">The securityTokenKind.</param> /// <param name="securityTokenService">The securityTokenService.</param> - public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService) + /// <param name="driverMemory">The driverMemory.</param> + /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param> + public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions) { this.yarnJobSubmissionParameters = yarnJobSubmissionParameters; this.securityTokenKind = securityTokenKind; this.securityTokenService = securityTokenService; + this.driverMemory = driverMemory; + this.maxApplicationSubmissions = maxApplicationSubmissions; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index 60316e4..608035b 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -26,7 +26,7 @@ under the License. <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <FileAlignment>512</FileAlignment> <RestorePackages>true</RestorePackages> - <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> </PropertyGroup> <PropertyGroup> <StartupObject /> @@ -72,7 +72,6 @@ under the License. <Compile Include="Avro\AvroAppSubmissionParameters.cs" /> <Compile Include="Avro\AvroJobSubmissionParameters.cs" /> <Compile Include="Avro\Local\AvroLocalAppSubmissionParameters.cs" /> - <Compile Include="Avro\YARN\AvroClusterAppSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnAppSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" /> <Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" /> @@ -317,13 +316,10 @@ under the License. --> <Target Name="CopyJarFiles"> <MSBuild Targets="Build" BuildInParallel="$(BuildInParallel)" Properties="Chip=$(Chip);Lang=$(Lang)" Projects="@(ProjectFile)" /> - <ItemGroup> - <MySourceFiles Include="$(Bindir)\**\Org.Apache.REEF.Bridge.JAR\*.jar"/> + <ItemGroup> + <MySourceFiles Include="$(Bindir)\**\Org.Apache.REEF.Bridge.JAR\*.jar" /> </ItemGroup> - <Copy - SourceFiles="@(MySourceFiles)" - DestinationFiles="@(MySourceFiles->'$(TargetDir)%(Filename)%(Extension)')" - /> + <Copy SourceFiles="@(MySourceFiles)" DestinationFiles="@(MySourceFiles->'$(TargetDir)%(Filename)%(Extension)')" /> </Target> <Target Name="RewriteClientResources" DependsOnTargets="CopyJarFiles"> <UpdateClientResources ProjectFolder="$(REEF_Source_Folder)" DebugOrRelease="$(Configuration)" resxOutputPath="$(TempResxFile)"> @@ -335,4 +331,4 @@ under the License. </Target> <Target Name="BeforeBuild" DependsOnTargets="$(BeforeBuildDependsOn);RewriteClientResources"> </Target> -</Project> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs index 81912ea..174ae2c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Client.Yarn.RestClient; using Org.Apache.REEF.Client.YARN; using Org.Apache.REEF.Client.YARN.RestClient.DataModel; using Org.Apache.REEF.Common.Files; +using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; @@ -111,8 +112,7 @@ namespace Org.Apache.REEF.Client.Yarn // TODO: Remove this when we have a generalized way to pass config to java var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); - - var submissionJobArgsFilePath = _paramSerializer.SerializeJobFile(jobSubmission, driverFolderPath); + var submissionJobArgsFilePath = _paramSerializer.SerializeJobFile(jobSubmission, paramInjector, driverFolderPath); var submissionAppArgsFilePath = _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, driverFolderPath); // Submit the driver http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs index 33ab5b9..41697f2 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs @@ -94,8 +94,6 @@ namespace Org.Apache.REEF.Client.YARN // prepare configuration var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray()); - var maxApplicationSubmissions = - paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>(); _paramSerializer.SerializeAppFile(jobSubmission, paramInjector, localDriverFolderPath); _paramSerializer.SerializeJobFile(jobSubmission, localDriverFolderPath, jobSubmissionDirectory); @@ -115,7 +113,7 @@ namespace Org.Apache.REEF.Client.YARN var submissionReq = CreateApplicationSubmissionRequest(jobSubmission, applicationId, - maxApplicationSubmissions, + jobSubmission.MaxApplicationSubmissions, jobResources); var submittedApplication = _yarnRMClient.SubmitApplicationAsync(submissionReq).GetAwaiter().GetResult(); Log.Log(Level.Info, @"Submitted application {0}", submittedApplication.Id); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs index dd28702..91445df 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs @@ -69,9 +69,8 @@ namespace Org.Apache.REEF.Client.YARN var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters { sharedAppSubmissionParameters = avroAppSubmissionParameters, - driverMemory = jobSubmission.DriverMemory, driverRecoveryTimeout = - paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(), + paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>() }; return AvroJsonSerializer<AvroYarnAppSubmissionParameters>.ToBytes(avroYarnAppSubmissionParameters); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs index e2278e4..9bfa93c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs @@ -81,25 +81,18 @@ namespace Org.Apache.REEF.Client.YARN var avroYarnAppSubmissionParameters = new AvroYarnAppSubmissionParameters { sharedAppSubmissionParameters = avroAppSubmissionParameters, - driverMemory = jobSubmission.DriverMemory, driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>() }; - var avroYarnClusterAppSubmissionParameters = new AvroYarnClusterAppSubmissionParameters - { - yarnAppSubmissionParameters = avroYarnAppSubmissionParameters, - maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>() - }; - - return AvroJsonSerializer<AvroYarnClusterAppSubmissionParameters>.ToBytes(avroYarnClusterAppSubmissionParameters); + return AvroJsonSerializer<AvroYarnAppSubmissionParameters>.ToBytes(avroYarnAppSubmissionParameters); } /// <summary> /// Serializes the job parameters to job-submission-params.json. /// </summary> - internal string SerializeJobFile(IJobSubmission jobSubmission, string driverFolderPath) + internal string SerializeJobFile(IJobSubmission jobSubmission, IInjector paramInjector, string driverFolderPath) { - var serializedArgs = SerializeJobArgsToBytes(jobSubmission, driverFolderPath); + var serializedArgs = SerializeJobArgsToBytes(jobSubmission, paramInjector, driverFolderPath); var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile()); using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew)) @@ -110,7 +103,7 @@ namespace Org.Apache.REEF.Client.YARN return submissionArgsFilePath; } - internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, string driverFolderPath) + internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, IInjector paramInjector, string driverFolderPath) { var avroJobSubmissionParameters = new AvroJobSubmissionParameters { @@ -124,11 +117,17 @@ namespace Org.Apache.REEF.Client.YARN sharedJobSubmissionParameters = avroJobSubmissionParameters }; + var maxApplicationSubmissions = jobSubmission.MaxApplicationSubmissions == 1 + ? paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>() + : jobSubmission.MaxApplicationSubmissions; + var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters { securityTokenKind = _securityTokenKind, securityTokenService = _securityTokenService, - yarnJobSubmissionParameters = avroYarnJobSubmissionParameters + yarnJobSubmissionParameters = avroYarnJobSubmissionParameters, + driverMemory = jobSubmission.DriverMemory, + maxApplicationSubmissions = maxApplicationSubmissions }; return AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs index 797b461..995a958 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs @@ -143,6 +143,7 @@ namespace Org.Apache.REEF.Driver.Bridge { } + [Obsolete("Deprecated in 0.14, will be removed.")] [NamedParameter("The number of times an application should be submitted in case of failure.", "MaxApplicationSubmissions", "1")] public class MaxApplicationSubmissions : Name<int> { http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs index dc599b4..8242721 100644 --- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs @@ -171,6 +171,7 @@ namespace Org.Apache.REEF.Driver /// <summary> /// The number of times the application should be submitted in case of failures /// </summary> + [Obsolete("Deprecated in 0.14, will be removed.")] public static readonly OptionalParameter<int> MaxApplicationSubmissions = new OptionalParameter<int>(); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs index 0e11175..d481489 100644 --- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs +++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs @@ -64,7 +64,6 @@ namespace Org.Apache.REEF.Examples.DriverRestart .Set(DriverConfiguration.OnDriverRestartEvaluatorFailed, GenericType<HelloRestartDriver>.Class) .Set(DriverConfiguration.OnDriverReconnect, GenericType<DefaultYarnClusterHttpDriverConnection>.Class) .Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, (5 * 60).ToString()) - .Set(DriverConfiguration.MaxApplicationSubmissions, 2.ToString()) .Build(); // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. @@ -72,6 +71,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart .AddDriverConfiguration(driverConfiguration) .AddGlobalAssemblyForType(typeof(HelloRestartDriver)) .SetJobIdentifier("DriverRestart_" + Guid.NewGuid().ToString().Substring(0, 6)) + .SetMaxApplicationSubmissions(2) .Build(); _reefClient.SubmitAndGetJobStatus(restartJobSubmission); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc index 30205e1..d4926e9 100644 --- a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc @@ -45,18 +45,7 @@ "doc": "General cross-language application submission parameters to the YARN runtime", "fields": [ { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" }, - { "name": "driverMemory", "type": "int" }, { "name": "driverRecoveryTimeout", "type": "int" } ] - }, - { - "namespace": "org.apache.reef.reef.bridge.client.avro", - "type": "record", - "name": "AvroYarnClusterAppSubmissionParameters", - "doc": "Cross-language application submission parameters to the YARN runtime using Hadoop's submission client", - "fields": [ - { "name": "yarnAppSubmissionParameters", "type": "AvroYarnAppSubmissionParameters" }, - { "name": "maxApplicationSubmissions", "type": "int" } - ] } ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc index 61b9812..70dc14c 100644 --- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc @@ -46,7 +46,9 @@ "fields": [ { "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" }, { "name": "securityTokenKind", "type": "string", "default": "NULL" }, - { "name": "securityTokenService", "type": "string", "default": "NULL" } + { "name": "securityTokenService", "type": "string", "default": "NULL" }, + { "name": "driverMemory", "type": "int" }, + { "name": "maxApplicationSubmissions", "type": "int" } ] } ] http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java index 6fa3f83..44eaffc 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java @@ -58,25 +58,25 @@ final class YarnClusterSubmissionFromCS { private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters; private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters; - private YarnClusterSubmissionFromCS(final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters, + private YarnClusterSubmissionFromCS(final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters, final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) { yarnJobSubmissionParameters = yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters(); - yarnAppSubmissionParameters = yarnClusterAppSubmissionParameters.getYarnAppSubmissionParameters(); + this.yarnAppSubmissionParameters = yarnAppSubmissionParameters; final AvroJobSubmissionParameters jobSubmissionParameters = yarnJobSubmissionParameters.getSharedJobSubmissionParameters(); final AvroAppSubmissionParameters appSubmissionParameters = - yarnAppSubmissionParameters.getSharedAppSubmissionParameters(); + this.yarnAppSubmissionParameters.getSharedAppSubmissionParameters(); this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString()); this.jobId = jobSubmissionParameters.getJobId().toString(); this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort(); this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount(); this.tcpTryCount = appSubmissionParameters.getTcpTryCount(); - this.maxApplicationSubmissions = yarnClusterAppSubmissionParameters.getMaxApplicationSubmissions(); - this.driverRecoveryTimeout = yarnAppSubmissionParameters.getDriverRecoveryTimeout(); - this.driverMemory = yarnAppSubmissionParameters.getDriverMemory(); + this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions(); + this.driverRecoveryTimeout = this.yarnAppSubmissionParameters.getDriverRecoveryTimeout(); + this.driverMemory = yarnClusterJobSubmissionParameters.getDriverMemory(); this.priority = DEFAULT_PRIORITY; this.queue = DEFAULT_QUEUE; this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString(); @@ -215,10 +215,10 @@ final class YarnClusterSubmissionFromCS { static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream( final InputStream appInputStream, final InputStream jobInputStream) throws IOException { final JsonDecoder appDecoder = DecoderFactory.get().jsonDecoder( - AvroYarnClusterAppSubmissionParameters.getClassSchema(), appInputStream); - final SpecificDatumReader<AvroYarnClusterAppSubmissionParameters> appReader = new SpecificDatumReader<>( - AvroYarnClusterAppSubmissionParameters.class); - final AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder); + AvroYarnAppSubmissionParameters.getClassSchema(), appInputStream); + final SpecificDatumReader<AvroYarnAppSubmissionParameters> appReader = new SpecificDatumReader<>( + AvroYarnAppSubmissionParameters.class); + final AvroYarnAppSubmissionParameters yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder); final JsonDecoder jobDecoder = DecoderFactory.get().jsonDecoder( AvroYarnClusterJobSubmissionParameters.getClassSchema(), jobInputStream); http://git-wip-us.apache.org/repos/asf/reef/blob/f089f88b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index c77caf7..accdd0c 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -62,7 +62,9 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "{" + "\"yarnJobSubmissionParameters\":" + AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING + "," + "\"securityTokenKind\":\"" + NULL_REP + "\"," + - "\"securityTokenService\":\"" + NULL_REP + "\"" + + "\"securityTokenService\":\"" + NULL_REP + "\"," + + "\"maxApplicationSubmissions\":" + NUMBER_REP + "," + + "\"driverMemory\":" + NUMBER_REP + "}"; private static final String AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING = @@ -73,16 +75,9 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "\"tcpRangeCount\":" + NUMBER_REP + "," + "\"tcpTryCount\":" + NUMBER_REP + "}," + - "\"driverMemory\":" + NUMBER_REP + "," + "\"driverRecoveryTimeout\":" + NUMBER_REP + "}"; - private static final String AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING = - "{" + - "\"yarnAppSubmissionParameters\":" + AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING + "," + - "\"maxApplicationSubmissions\":" + NUMBER_REP + - "}"; - /** * Tests deserialization of the Avro parameters for submission from the cluster from C#. * @throws IOException @@ -176,7 +171,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { private static YarnClusterSubmissionFromCS createYarnClusterSubmissionFromCS() throws IOException { try (final InputStream appStream = new ByteArrayInputStream( - AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { try (final InputStream jobStream = new ByteArrayInputStream( AVRO_YARN_CLUSTER_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) {
