Repository: reef Updated Branches: refs/heads/master 2d0246926 -> 18c24b1c7
[REEF-1820] Specify node names and relaxLocality flag in Evaluator request Addresses the following: * Specify node names and relaxLocality flag in Evaluator request in .Net * Specify node names and relaxLocality flag in Evaluator request in Java * Modify Bridge to patch changes from .Net to Java * Extends Hello Reef for yarn as sample/e2e test for the current changes JIRA: [REEF-1820](https://issues.apache.org/jira/browse/REEF-1820) Pull request: This closes #1360 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/18c24b1c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/18c24b1c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/18c24b1c Branch: refs/heads/master Commit: 18c24b1c760ef9afc1ea45d4c54cce8e97f3b29f Parents: 2d02469 Author: Julia Wang <jul...@apache.org> Authored: Mon Aug 7 19:59:06 2017 -0700 Committer: Shravan Narayanamurthy <shra...@apache.org> Committed: Tue Aug 8 19:17:17 2017 -0700 ---------------------------------------------------------------------- .../EvaluatorRequestorClr2Java.cpp | 6 +- lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp | 15 ++ lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h | 4 + .../Bridge/Events/EvaluatorRequestor.cs | 4 +- .../Evaluator/EvaluatorRequest.cs | 32 +++- .../Evaluator/EvaluatorRequestBuilder.cs | 45 +++++- .../Evaluator/IEvaluatorRequest.cs | 23 ++- .../HelloDriverYarn.cs | 122 ++++++++++++++ .../HelloREEF.cs | 2 +- .../HelloREEFYarn.cs | 161 +++++++++++++++++++ .../Org.Apache.REEF.Examples.HelloREEF.csproj | 3 + .../Org.Apache.REEF.Examples.HelloREEF/Run.cs | 38 +++++ .../javabridge/EvaluatorRequestorBridge.java | 6 +- .../reef/driver/evaluator/EvaluatorRequest.java | 56 ++++++- .../common/driver/EvaluatorRequestorImpl.java | 20 +-- 15 files changed, 510 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp index 7616c2c..90fe5d4 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp @@ -53,7 +53,7 @@ namespace Org { ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); - jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIILjava/lang/String;Ljava/lang/String;)V"); + jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;)V"); if (jmidSubmit == NULL) { fprintf(stdout, " jmidSubmit is NULL\n"); @@ -66,8 +66,10 @@ namespace Org { request->Number, request->MemoryMegaBytes, request->VirtualCore, + request->RelaxLocality, JavaStringFromManagedString(env, request->Rack), - JavaStringFromManagedString(env, request->RuntimeName)); + JavaStringFromManagedString(env, request->RuntimeName), + JavaArrayListFromManagedList(env, request->NodeNames)); ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit"); } http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp index 628e7ba..4402c44 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp @@ -70,6 +70,21 @@ jstring JavaStringFromManagedString( return env->NewString((const jchar*)wch, managedString->Length); } +jobject JavaArrayListFromManagedList( + JNIEnv *env, + System::Collections::Generic::ICollection<String^>^ managedNodeNames) { + + jclass arrayListClazz = (*env).FindClass("java/util/ArrayList"); + jobject arrayListObj = (*env).NewObject(arrayListClazz, (*env).GetMethodID(arrayListClazz, "<init>", "()V")); + + for each (String^ nodeName in managedNodeNames) + { + jstring nodeNamestr = JavaStringFromManagedString(env, nodeName); + (*env).CallBooleanMethod(arrayListObj, (*env).GetMethodID(arrayListClazz, "add", "(Ljava/lang/Object;)Z"), nodeNamestr); + } + return arrayListObj; +} + void HandleClr2JavaError( JNIEnv *env, String^ errorMessage, http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h index c06377c..bbaf369 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h +++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h @@ -48,6 +48,10 @@ jstring JavaStringFromManagedString( JNIEnv *env, String^ managedString); +jobject JavaArrayListFromManagedList( + JNIEnv *env, + System::Collections::Generic::ICollection<String^>^ managedNodeNames); + array<byte>^ ManagedByteArrayFromJavaByteArray( JNIEnv *env, jbyteArray javaByteArray); http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs index 31cb36f..504ae17 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Linq; using System.Runtime.Serialization; using Org.Apache.REEF.Common.Catalog; using Org.Apache.REEF.Common.Evaluator; @@ -64,7 +65,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void Submit(IEvaluatorRequest request) { - LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} and runtime {4}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName); + LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}.", + request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality); lock (Evaluators) { for (var i = 0; i < request.Number; i++) http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs index 685da50..2c8365d 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs @@ -16,6 +16,8 @@ // under the License. using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.Serialization; namespace Org.Apache.REEF.Driver.Evaluator @@ -27,37 +29,43 @@ namespace Org.Apache.REEF.Driver.Evaluator internal class EvaluatorRequest : IEvaluatorRequest { internal EvaluatorRequest() - : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) + : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) { } internal EvaluatorRequest(int number, int megaBytes) - : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) + : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) { } internal EvaluatorRequest(int number, int megaBytes, int core) - : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) + : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) { } internal EvaluatorRequest(int number, int megaBytes, string rack) - : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty) + : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack) - : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty) + : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) { } - internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId) - : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty) + internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames) + : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true) { } - internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName) + internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality) + : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality) + + { + } + + internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality) { Number = number; MemoryMegaBytes = megaBytes; @@ -65,6 +73,8 @@ namespace Org.Apache.REEF.Driver.Evaluator Rack = rack; EvaluatorBatchId = evaluatorBatchId; RuntimeName = runtimeName; + NodeNames = nodeNames; + RelaxLocality = relaxLocality; } [DataMember] @@ -85,6 +95,12 @@ namespace Org.Apache.REEF.Driver.Evaluator [DataMember] public string RuntimeName { get; private set; } + [DataMember] + public ICollection<string> NodeNames { get; private set; } + + [DataMember] + public bool RelaxLocality { get; private set; } + internal static EvaluatorRequestBuilder NewBuilder() { return new EvaluatorRequestBuilder(); http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs index d6e5a63..6ccd1fb 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs @@ -16,6 +16,8 @@ // under the License. using System; +using System.Collections.Generic; +using System.Linq; using Org.Apache.REEF.Common.Runtime; namespace Org.Apache.REEF.Driver.Evaluator @@ -25,6 +27,8 @@ namespace Org.Apache.REEF.Driver.Evaluator private string _evaluatorBatchId; private string _rackName; private string _runtimeName; + private ICollection<string> _nodeNames; + private bool _relaxLocality; internal EvaluatorRequestBuilder(IEvaluatorRequest request) { @@ -34,6 +38,8 @@ namespace Org.Apache.REEF.Driver.Evaluator _evaluatorBatchId = request.EvaluatorBatchId; _rackName = request.Rack; _runtimeName = request.RuntimeName; + _nodeNames = request.NodeNames; + _relaxLocality = request.RelaxLocality; } internal EvaluatorRequestBuilder() @@ -44,6 +50,8 @@ namespace Org.Apache.REEF.Driver.Evaluator _rackName = string.Empty; _evaluatorBatchId = Guid.NewGuid().ToString("N"); _runtimeName = string.Empty; + _nodeNames = Enumerable.Empty<string>().ToList(); + _relaxLocality = true; } public int Number { get; private set; } @@ -95,7 +103,29 @@ namespace Org.Apache.REEF.Driver.Evaluator } /// <summary> - /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the + /// Set desired node names for the Evaluator to be allocated on. + /// </summary> + /// <param name="nodeNames"></param> + /// <returns>this</returns> + public EvaluatorRequestBuilder AddNodeNames(ICollection<string> nodeNames) + { + _nodeNames = nodeNames; + return this; + } + + /// <summary> + /// Set a desired node name for evaluator to be allocated + /// </summary> + /// <param name="nodeName"></param> + /// <returns></returns> + public EvaluatorRequestBuilder AddNodeName(string nodeName) + { + _nodeNames.Add(nodeName); + return this; + } + + /// <summary> + /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the /// same request will have the same Evaluator Batch ID. /// </summary> /// <param name="evaluatorBatchId">The batch ID for the Evaluator request.</param> @@ -117,12 +147,23 @@ namespace Org.Apache.REEF.Driver.Evaluator } /// <summary> + /// Set the relax locality for requesting evaluator with specified node names + /// </summary> + /// <param name="relaxLocality">Locality relax flag.</param> + /// <returns>this</returns> + public EvaluatorRequestBuilder SetRelaxLocality(bool relaxLocality) + { + _relaxLocality = relaxLocality; + return this; + } + + /// <summary> /// Build the EvaluatorRequest. /// </summary> /// <returns></returns> public IEvaluatorRequest Build() { - return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName); + return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs index 6b2ce21..357ffe3 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System.Collections.Generic; + namespace Org.Apache.REEF.Driver.Evaluator { /// <summary> @@ -43,14 +45,33 @@ namespace Org.Apache.REEF.Driver.Evaluator string Rack { get; } /// <summary> + /// The desired node names for the Evaluator to be allocated on. + /// </summary> + ICollection<string> NodeNames { get; } + + /// <summary> /// The batch ID for requested evaluators. Evaluators requested in the same batch /// will have the same Batch ID. /// </summary> string EvaluatorBatchId { get; } /// <summary> - /// The name of the runtime to allocate teh evaluator on + /// The name of the runtime to allocate the evaluator on /// </summary> string RuntimeName { get; } + + /// <summary> + /// For a request at a network hierarchy level, set whether locality can be relaxed to that level and beyond. + /// If the flag is off on a rack-level ResourceRequest, containers at that request's priority + /// will not be assigned to nodes on that request's rack unless requests specifically for + /// those nodes have also been submitted. + /// If the flag is off on an ANY-level ResourceRequest, containers at that request's priority + /// will only be assigned on racks for which specific requests have also been submitted. + /// For example, to request a container strictly on a specific node, the corresponding rack-level + /// and any-level requests should have locality relaxation set to false. Similarly, + /// to request a container strictly on a specific rack, + /// the corresponding any-level request should have locality relaxation set to false. + /// </summary> + bool RelaxLocality { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs new file mode 100644 index 0000000..9c84d33 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Examples.HelloREEF +{ + /// <summary> + /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it. + /// </summary> + public sealed class HelloDriverYarn : IObserver<IAllocatedEvaluator>, IObserver<IDriverStarted> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(HelloDriver)); + private readonly IEvaluatorRequestor _evaluatorRequestor; + + /// <summary> + /// List of node names for desired evaluators + /// </summary> + private readonly IList<string> _nodeNames; + + /// <summary> + /// Specify if the desired node names is relaxed + /// </summary> + private readonly bool _relaxLocality; + + /// <summary> + /// Constructor of the driver + /// </summary> + /// <param name="evaluatorRequestor">Evaluator Requestor</param> + /// <param name="nodeNames">Node names for evaluators</param> + /// <param name="relaxLocality">Relax indicator of evaluator node request</param> + [Inject] + private HelloDriverYarn(IEvaluatorRequestor evaluatorRequestor, + [Parameter(typeof(NodeNames))] ISet<string> nodeNames, + [Parameter(typeof(RelaxLocality))] bool relaxLocality) + { + _evaluatorRequestor = evaluatorRequestor; + _nodeNames = nodeNames.ToList(); + _relaxLocality = relaxLocality; + } + + /// <summary> + /// Submits the HelloTask to the Evaluator. + /// </summary> + /// <param name="allocatedEvaluator"></param> + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + Logger.Log(Level.Info, "Received allocatedEvaluator-HostName: " + allocatedEvaluator.GetEvaluatorDescriptor().NodeDescriptor.HostName); + var taskConfiguration = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "HelloTask") + .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) + .Build(); + allocatedEvaluator.SubmitTask(taskConfiguration); + } + + public void OnError(Exception error) + { + throw error; + } + + public void OnCompleted() + { + } + + /// <summary> + /// Called to start the user mode driver + /// </summary> + /// <param name="driverStarted"></param> + public void OnNext(IDriverStarted driverStarted) + { + Logger.Log(Level.Info, string.Format("HelloDriver started at {0}", driverStarted.StartTime)); + + if (_nodeNames != null && _nodeNames.Count > 0) + { + _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder() + .AddNodeNames(_nodeNames) + .SetMegabytes(64) + .SetNumber(_nodeNames.Count) + .SetRelaxLocality(_relaxLocality) + .Build()); + } + else + { + _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder() + .SetMegabytes(64) + .Build()); + } + } + } + + [NamedParameter(documentation: "Set of node names for evaluators")] + internal class NodeNames : Name<ISet<string>> + { + } + + [NamedParameter(documentation: "RelaxLocality for specifying evaluator node names", shortName: "RelaxLocality", defaultValue: "true")] + internal class RelaxLocality : Name<bool> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs index 575a13f..20c80fa 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs @@ -103,7 +103,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF } } - public static void Main(string[] args) + public static void MainSimple(string[] args) { TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(args.Length > 0 ? args[0] : Local)).GetInstance<HelloREEF>().Run(); } http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs new file mode 100644 index 0000000..fbccdec --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN.RestClient.DataModel; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Examples.HelloREEF +{ + /// <summary> + /// A Tool that submits HelloREEFDriver for execution on YARN. + /// </summary> + public sealed class HelloREEFYarn + { + private const int ReTryCounts = 200; + private const int SleepTime = 2000; + private const string DefaultPortRangeStart = "2000"; + private const string DefaultPortRangeCount = "20"; + private const string TrustedApplicationTokenIdentifier = "TrustedApplicationTokenIdentifier"; + private const string SecurityTokenId = "SecurityTokenId"; + private const string SecurityTokenPwd = "SecurityTokenPwd"; + + private readonly IREEFClient _reefClient; + private readonly JobRequestBuilder _jobRequestBuilder; + + private static readonly Logger Logger = Logger.GetLogger(typeof(HelloREEFYarn)); + + /// <summary> + /// List of node names for evaluators + /// </summary> + private readonly IList<string> _nodeNames; + + [Inject] + private HelloREEFYarn(IREEFClient reefClient, + JobRequestBuilder jobRequestBuilder, + [Parameter(typeof(NodeNames))] ISet<string> nodeNames) + { + _reefClient = reefClient; + _jobRequestBuilder = jobRequestBuilder; + _nodeNames = nodeNames.ToList(); + } + + /// <summary> + /// Runs HelloREEF using the IREEFClient passed into the constructor. + /// </summary> + private void Run() + { + // The driver configuration contains all the needed handler bindings + var helloDriverConfiguration = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloDriverYarn>.Class) + .Set(DriverConfiguration.OnDriverStarted, GenericType<HelloDriverYarn>.Class) + .Build(); + + var driverConfig = TangFactory.GetTang() + .NewConfigurationBuilder(helloDriverConfiguration); + + foreach (var n in _nodeNames) + { + driverConfig.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, n); + } + + // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. + var helloJobRequest = _jobRequestBuilder + .AddDriverConfiguration(driverConfig.Build()) + .AddGlobalAssemblyForType(typeof(HelloDriverYarn)) + .SetJobIdentifier("HelloREEF") + .SetJavaLogLevel(JavaLoggingSetting.Verbose) + .Build(); + + var result = _reefClient.SubmitAndGetJobStatus(helloJobRequest); + var state = PullFinalJobStatus(result); + Logger.Log(Level.Info, "Application state : {0}.", state); + } + + /// <summary> + /// This is to pull job final status until the Job is done + /// </summary> + /// <param name="jobSubmitionResult"></param> + /// <returns></returns> + private FinalState PullFinalJobStatus(IJobSubmissionResult jobSubmitionResult) + { + int n = 0; + var state = jobSubmitionResult.FinalState; + while (state.Equals(FinalState.UNDEFINED) && n++ < ReTryCounts) + { + Thread.Sleep(SleepTime); + state = jobSubmitionResult.FinalState; + } + return state; + } + + /// <summary> + /// Get runtime configuration + /// </summary> + /// <returns></returns> + private static IConfiguration GetRuntimeConfiguration(string[] args) + { + var c = YARNClientConfiguration.ConfigurationModule + .Set(YARNClientConfiguration.SecurityTokenKind, TrustedApplicationTokenIdentifier) + .Set(YARNClientConfiguration.SecurityTokenService, TrustedApplicationTokenIdentifier) + .Build(); + + File.WriteAllText(SecurityTokenId, args[0]); + File.WriteAllText(SecurityTokenPwd, args[1]); + + IConfiguration tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule + .Set(TcpPortConfigurationModule.PortRangeStart, args.Length > 2 ? args[2] : DefaultPortRangeStart) + .Set(TcpPortConfigurationModule.PortRangeCount, args.Length > 3 ? args[3] : DefaultPortRangeCount) + .Build(); + + return Configurations.Merge(c, tcpPortConfig); + } + + /// <summary> + /// HelloREEF example running on YARN + /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]... + /// </summary> + /// <param name="args"></param> + public static void MainYarn(string[] args) + { + var configBuilder = TangFactory.GetTang() + .NewConfigurationBuilder(GetRuntimeConfiguration(args)); + + if (args.Length > 4) + { + for (int i = 4; i < args.Length; i++) + { + configBuilder.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, args[i]); + } + } + + TangFactory.GetTang().NewInjector(configBuilder.Build()).GetInstance<HelloREEFYarn>().Run(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj index d60e48c..be7fed1 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj @@ -31,9 +31,12 @@ <Link>Properties\SharedAssemblyInfo.cs</Link> </Compile> <Compile Include="HelloDriver.cs" /> + <Compile Include="HelloDriverYarn.cs" /> <Compile Include="HelloREEF.cs" /> + <Compile Include="HelloREEFYarn.cs" /> <Compile Include="HelloTask.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Run.cs" /> </ItemGroup> <ItemGroup> <None Include="$(SolutionDir)\App.config"> http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs new file mode 100644 index 0000000..f8aa715 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Org.Apache.REEF.Examples.HelloREEF +{ + public sealed class Run + { + /// <summary> + /// Program that runs hello reef + /// </summary> + /// <param name="args"></param> + public static void Main(string[] args) + { + if (args.Length < 2) + { + HelloREEF.MainSimple(args); + } + else + { + HelloREEFYarn.MainYarn(args); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java index 0db4388..518537d 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java @@ -66,8 +66,10 @@ public final class EvaluatorRequestorBridge extends NativeBridge { public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, + final boolean relaxLocality, final String rack, - final String runtimeName) { + final String runtimeName, + final ArrayList<String> nodeNames) { if (this.isBlocked) { throw new RuntimeException("Cannot request additional Evaluator, this is probably because " + "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); @@ -85,6 +87,8 @@ public final class EvaluatorRequestorBridge extends NativeBridge { .setMemory(memory) .setNumberOfCores(virtualCore) .setRuntimeName(runtimeName) + .setRelaxLocality(relaxLocality) + .addNodeNames(nodeNames) .build(); LOG.log(Level.FINE, "submitting evaluator request {0}", request); http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java index 38494ac..dfed8f6 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java @@ -40,6 +40,7 @@ public final class EvaluatorRequest { private final List<String> nodeNames; private final List<String> rackNames; private final String runtimeName; + private final boolean relaxLocality; EvaluatorRequest(final int number, final int megaBytes, @@ -55,12 +56,24 @@ public final class EvaluatorRequest { final List<String> nodeNames, final List<String> rackNames, final String runtimeName) { + this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true); + } + + + EvaluatorRequest(final int number, + final int megaBytes, + final int cores, + final List<String> nodeNames, + final List<String> rackNames, + final String runtimeName, + final boolean relaxLocality) { this.number = number; this.megaBytes = megaBytes; this.cores = cores; this.nodeNames = nodeNames; this.rackNames = rackNames; this.runtimeName = runtimeName; + this.relaxLocality = relaxLocality; } /** @@ -137,6 +150,16 @@ public final class EvaluatorRequest { } /** + * Access the locality relax flag. + * + * @return the value of relaxLocality. If not set default is true. + */ + public boolean getRelaxLocality() { + return relaxLocality; + } + + + /** * {@link EvaluatorRequest}s are build using this Builder. */ public static class Builder<T extends Builder> implements org.apache.reef.util.Builder<EvaluatorRequest> { @@ -147,6 +170,7 @@ public final class EvaluatorRequest { private final List<String> nodeNames = new ArrayList<>(); private final List<String> rackNames = new ArrayList<>(); private String runtimeName = ""; + private boolean relaxLocality = true; //if not set, default to true @Private public Builder() { @@ -163,6 +187,7 @@ public final class EvaluatorRequest { setMemory(request.getMegaBytes()); setNumberOfCores(request.getNumberOfCores()); setRuntimeName(request.getRuntimeName()); + setRelaxLocality(request.getRelaxLocality()); for (final String nodeName : request.getNodeNames()) { addNodeName(nodeName); } @@ -233,6 +258,21 @@ public final class EvaluatorRequest { } /** + * Adds node names.They are the preferred locations where the evaluator should + * run on. If any of the node is available, the RM will try to allocate the + * evaluator there + * + * @param nodeNamesList preferred node names + * @return this Builder. + */ + public T addNodeNames(final List<String> nodeNamesList) { + if(nodeNamesList != null) { + this.nodeNames.addAll(nodeNamesList); + } + return (T) this; + } + + /** * Adds a rack name. It is the preferred location where the evaluator should * run on. If the rack is available, the RM will try to allocate the * evaluator in one of its nodes. The RM will try to match node names first, @@ -247,11 +287,25 @@ public final class EvaluatorRequest { } /** + * A boolean relaxLocality flag defaulting to true, which tells the ResourceManager + * if the application wants locality to be loose (i.e. allows fall-through to rack or any) + * or strict (i.e. specify hard constraint on resource allocation). + * + * @param relaxLocalityFlg locality relaxation is enabled with this ResourceRequest + * @return this Builder. + */ + public T setRelaxLocality(final boolean relaxLocalityFlg) { + this.relaxLocality = relaxLocalityFlg; + return (T) this; + } + + /** * Builds the {@link EvaluatorRequest}. */ @Override public EvaluatorRequest build() { - return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, this.rackNames, this.runtimeName); + return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, + this.rackNames, this.runtimeName, this.relaxLocality); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java index b2faf98..992659f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java @@ -29,6 +29,7 @@ import org.apache.reef.util.logging.LoggingScope; import org.apache.reef.util.logging.LoggingScopeFactory; import javax.inject.Inject; +import java.util.Arrays; import java.util.logging.Level; import java.util.logging.Logger; @@ -60,8 +61,11 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor { @Override public synchronized void submit(final EvaluatorRequest req) { - LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.", - new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()}); + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.", + new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()}); + LOG.log(Level.FINEST, "Node names: " + Arrays.toString(req.getNodeNames().toArray())); + } if (req.getMegaBytes() <= 0) { throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes()); @@ -82,22 +86,18 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor { throw new IllegalArgumentException("Runtime name cannot be null"); } // for backwards compatibility, we will always set the relax locality flag - // to true unless the user configured racks, in which case we will check for - // the ANY modifier (*), if not there, then we won't relax the locality - boolean relaxLocality = true; + // to true unless the user has set it to false in the request, in which case + // we will check for the ANY modifier (*), if there, then we relax the + // locality regardless of the value set in the request. + boolean relaxLocality = req.getRelaxLocality(); if (!req.getRackNames().isEmpty()) { for (final String rackName : req.getRackNames()) { if (Constants.ANY_RACK.equals(rackName)) { relaxLocality = true; break; } - relaxLocality = false; } } - // if the user specified any node, then we assume they do not want to relax locality - if (!req.getNodeNames().isEmpty()) { - relaxLocality = false; - } try (LoggingScope ls = this.loggingScopeFactory.evaluatorSubmit(req.getNumber())) { final ResourceRequestEvent request = ResourceRequestEventImpl