[REEF-1246] Multi runtime for c# driver This change enables specifying runtime name in C# driver
JIRA: [REEF-1246](https://issues.apache.org/jira/browse/REEF-1246) Pull Request: This closes #885 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/88ddc3a4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/88ddc3a4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/88ddc3a4 Branch: refs/heads/master Commit: 88ddc3a46b5d2c24ff6229ee278e47a7b4cd7ddd Parents: 1d19ea5 Author: Boris Shulman <[email protected]> Authored: Fri Apr 1 18:58:24 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Apr 11 13:12:54 2016 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 + .../EvaluatorRequestorClr2Java.cpp | 25 +++++- .../Org.Apache.REEF.Common.csproj | 1 + .../Runtime/RuntimeName.cs | 45 +++++++++++ .../Bridge/Avro/DefinedRuntimes.cs | 65 +++++++++++++++ .../Bridge/Avro/DefinedRuntimesSerializer.cs | 38 +++++++++ .../Clr2java/IEvaluatorRequestorClr2Java.cs | 2 + .../Bridge/Events/EvaluatorRequestor.cs | 21 ++++- .../Evaluator/EvaluatorDescriptorImpl.cs | 26 ++++-- .../Evaluator/EvaluatorRequest.cs | 22 +++-- .../Evaluator/EvaluatorRequestBuilder.cs | 18 ++++- .../Evaluator/IEvaluatorDescriptor.cs | 3 +- .../Evaluator/IEvaluatorRequest.cs | 7 ++ .../Org.Apache.REEF.Driver.csproj | 3 + lang/cs/Org.Apache.REEF.Driver/packages.config | 2 + .../Org.Apache.REEF.Evaluator.Tests/app.config | 29 +++++++ .../Functional/ReefFunctionalTest.cs | 3 +- ...uestingDriverSpecifyingDefaultRuntimeName.cs | 82 +++++++++++++++++++ ...uestingDriverSpecifyingInvalidRuntimeName.cs | 82 +++++++++++++++++++ ...atorRequestingDriverSpecifyingRuntimeName.cs | 84 ++++++++++++++++++++ .../Functional/RuntimeName/RuntimeNameTest.cs | 66 +++++++++++++-- .../Org.Apache.REEF.Tests.csproj | 3 + .../YarnBootstrapDriverConfigGenerator.java | 2 + lang/java/reef-bridge-java/pom.xml | 21 +++++ .../src/main/avro/DefinedRuntimes.avsc | 37 +++++++++ .../javabridge/EvaluatorRequestorBridge.java | 34 +++++++- .../reef/javabridge/FailedEvaluatorBridge.java | 7 +- .../reef/javabridge/generic/JobDriver.java | 17 ++-- .../utils/DefinedRuntimesSerializer.java | 73 +++++++++++++++++ .../reef/javabridge/utils/package-info.java | 22 +++++ .../driver/parameters/DefinedRuntimes.java | 31 ++++++++ .../client/HDInsightDriverConfiguration.java | 7 ++ ...DInsightDriverConfigurationProviderImpl.java | 4 +- .../hdinsight/driver/RuntimeIdentifier.java | 34 ++++++++ .../runtime/hdinsight/driver/package-info.java | 22 +++++ .../LocalDriverConfigurationProviderImpl.java | 4 +- .../local/client/LocalRuntimeConfiguration.java | 3 + .../local/driver/LocalDriverConfiguration.java | 6 ++ .../runtime/local/driver/RuntimeIdentifier.java | 3 + .../MesosDriverConfigurationProviderImpl.java | 2 + .../mesos/driver/MesosDriverConfiguration.java | 6 ++ .../runtime/mesos/driver/RuntimeIdentifier.java | 3 + ...MultiRuntimeDriverConfigurationProvider.java | 9 ++- .../driver/MultiRuntimeDriverConfiguration.java | 12 +-- .../YarnDriverConfigurationProviderImpl.java | 2 + .../runtime/yarn/driver/RuntimeIdentifier.java | 3 + .../yarn/driver/YarnDriverConfiguration.java | 6 ++ 47 files changed, 949 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 21a2f4a..d7eb713 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -104,6 +104,7 @@ namespace Org { !EvaluatorRequestorClr2Java(); virtual void OnError(String^ message); virtual void Submit(IEvaluatorRequest^ request); + virtual array<byte>^ GetDefinedRuntimes(); }; public ref class TaskMessageClr2Java : public ITaskMessageClr2Java { http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp index 9f1698a..0a4df03 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp @@ -53,7 +53,7 @@ namespace Org { ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); - jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIILjava/lang/String;)V"); + jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIILjava/lang/String;Ljava/lang/String;)V"); if (jmidSubmit == NULL) { fprintf(stdout, " jmidSubmit is NULL\n"); @@ -66,10 +66,31 @@ namespace Org { request->Number, request->MemoryMegaBytes, request->VirtualCore, - JavaStringFromManagedString(env, request->Rack)); + JavaStringFromManagedString(env, request->Rack), + JavaStringFromManagedString(env, request->RuntimeName)); ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit"); } + array<byte>^ EvaluatorRequestorClr2Java::GetDefinedRuntimes() { + ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::GetDefinedRuntimes"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); + jmethodID jmidGetDefinedRuntimes = env->GetMethodID(jclassEvaluatorRequestor, "getDefinedRuntimes", "()[B"); + + if (jmidGetDefinedRuntimes == NULL) { + fprintf(stdout, " jmidGetDefinedRuntimes is NULL\n"); + fflush(stdout); + return nullptr; + } + + jbyteArray jBytes = (jbyteArray)env->CallObjectMethod( + _jobjectEvaluatorRequestor, + jmidGetDefinedRuntimes); + + ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::GetDefinedRuntimes"); + return ManagedByteArrayFromJavaByteArray(env, jBytes); + } + void EvaluatorRequestorClr2Java::OnError(String^ message) { ManagedLog::LOGGER->Log("EvaluatorRequestorClr2Java::OnError"); JNIEnv *env = RetrieveEnv(_jvm); http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 9a935c1..66ea5d9 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -165,6 +165,7 @@ under the License. <Compile Include="Runtime\Evaluator\Task\TaskStopImpl.cs" /> <Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" /> <Compile Include="runtime\MachineStatus.cs" /> + <Compile Include="Runtime\RuntimeName.cs" /> <Compile Include="Services\ServiceConfiguration.cs" /> <Compile Include="Tasks\Defaults\DefaultTaskCloseHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Common/Runtime/RuntimeName.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/RuntimeName.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/RuntimeName.cs new file mode 100644 index 0000000..59ea062 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/RuntimeName.cs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Org.Apache.REEF.Common.Runtime +{ + /// <summary> + /// This enum reflects runtime name values as they defined on teh Java side. + /// </summary> + public enum RuntimeName + { + /// <summary> + /// Same value as org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME + /// </summary> + Local, + + /// <summary> + /// Same value as org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME + /// </summary> + Yarn, + + /// <summary> + /// Same value as org.apache.reef.runtime.mesos.driver.RuntimeIdentifier.RUNTIME_NAME + /// </summary> + Mesos, + + /// <summary> + /// Default value for the enum + /// </summary> + Default + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimes.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimes.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimes.cs new file mode 100644 index 0000000..fff6a2c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimes.cs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Driver.Bridge.Avro +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.javabridge.avro.DefinedRuntimes. + /// </summary> + [DataContract(Name = "DefinedRuntimes", Namespace = "org.apache.reef.javabridge.avro")] + [KnownType(typeof(HashSet<string>))] + public class DefinedRuntimes + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.javabridge.avro.DefinedRuntimes"",""doc"":""Defines the schema for the defined runtime names. This avro object is used to pass runtime names to the c#"",""fields"":[{""name"":""runtimeNames"",""doc"":""defined runtime names"",""type"":{""type"":""array"",""items"":""string""}}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the runtimeNames field. + /// </summary> + [DataMember] + public List<string> runtimeNames { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="DefinedRuntimes"/> class. + /// </summary> + public DefinedRuntimes() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="DefinedRuntimes"/> class. + /// </summary> + /// <param name="runtimeNames">The runtimeNames.</param> + public DefinedRuntimes(ISet<string> runtimeNames) + { + this.runtimeNames = new List<string>(runtimeNames); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimesSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimesSerializer.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimesSerializer.cs new file mode 100644 index 0000000..f6af37d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/DefinedRuntimesSerializer.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +using System.IO; + +using Microsoft.Hadoop.Avro; + +namespace Org.Apache.REEF.Driver.Bridge.Avro +{ + /// <summary> + /// Provides deserialization methods for DefinedRuntimes avro object + /// </summary> + internal static class DefinedRuntimesSerializer + { + private static readonly IAvroSerializer<DefinedRuntimes> Serializer = AvroSerializer.Create<DefinedRuntimes>(); + + public static DefinedRuntimes FromBytes(byte[] serializedData) + { + using (Stream stream = new MemoryStream(serializedData)) + { + return Serializer.Deserialize(stream); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs index d118005..d19079a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs @@ -24,5 +24,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java public interface IEvaluatorRequestorClr2Java : IClr2Java { void Submit(IEvaluatorRequest evaluatorRequest); + + byte[] GetDefinedRuntimes(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs index 43c17a5..9e75904 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs @@ -16,12 +16,13 @@ // under the License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Runtime.Serialization; + using Org.Apache.REEF.Common.Catalog; using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Bridge.Avro; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Utilities.Diagnostics; @@ -38,9 +39,14 @@ namespace Org.Apache.REEF.Driver.Bridge.Events private static readonly IDictionary<string, IEvaluatorDescriptor> EvaluatorDescriptorsDictionary = new Dictionary<string, IEvaluatorDescriptor>(); + private readonly DefinedRuntimes runtimes; + internal EvaluatorRequestor(IEvaluatorRequestorClr2Java clr2Java) { Clr2Java = clr2Java; + byte[] data = Clr2Java.GetDefinedRuntimes(); + runtimes = DefinedRuntimesSerializer.FromBytes(data); + LOGGER.Log(Level.Info, "Defined runtimes " + ((runtimes.runtimeNames == null) ? "null" : string.Join(",", runtimes.runtimeNames))); } /// <summary> @@ -59,13 +65,20 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void Submit(IEvaluatorRequest request) { - LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack); - + LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} and runtime {4}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName); lock (Evaluators) { for (var i = 0; i < request.Number; i++) { - var descriptor = new EvaluatorDescriptorImpl(new NodeDescriptorImpl(), EvaluatorType.CLR, request.MemoryMegaBytes, request.VirtualCore, request.Rack); + if (!string.IsNullOrWhiteSpace(request.RuntimeName)) + { + if (runtimes.runtimeNames != null && !runtimes.runtimeNames.Contains(request.RuntimeName)) + { + throw new ArgumentException(string.Format("Requested runtime {0} is not in the defined runtimes list {1}", request.RuntimeName, string.Join(",", runtimes.runtimeNames))); + } + } + + var descriptor = new EvaluatorDescriptorImpl(new NodeDescriptorImpl(), EvaluatorType.CLR, request.MemoryMegaBytes, request.VirtualCore, request.RuntimeName, request.Rack); var key = string.Format(CultureInfo.InvariantCulture, "{0}{1}{2}", request.EvaluatorBatchId, BatchIdxSeparator, i); try { http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs index dc8a14f..b46e492 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs @@ -21,6 +21,7 @@ using System.Linq; using System.Net; using Org.Apache.REEF.Common.Catalog; using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Runtime; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Diagnostics; @@ -39,17 +40,19 @@ namespace Org.Apache.REEF.Driver.Evaluator private readonly int _megaBytes; private readonly INodeDescriptor _nodeDescriptor; private readonly string _rack; - private readonly string _runtimeName; + private readonly RuntimeName _runtimeName; - // TODO[JIRA REEF-1054]: make runtimeName not optional - internal EvaluatorDescriptorImpl(INodeDescriptor nodeDescriptor, EvaluatorType type, int megaBytes, int core, string rack = DefaultRackName, string runtimeName = "") + internal EvaluatorDescriptorImpl(INodeDescriptor nodeDescriptor, EvaluatorType type, int megaBytes, int core, string runtimeName, string rack = DefaultRackName) { _nodeDescriptor = nodeDescriptor; _evaluatorType = type; _megaBytes = megaBytes; _core = core; _rack = rack; - _runtimeName = runtimeName; + if (!string.IsNullOrWhiteSpace(runtimeName) && !Enum.TryParse(runtimeName, true, out _runtimeName)) + { + throw new ArgumentException("Unknown runtime name received " + runtimeName); + } } /// <summary> @@ -71,12 +74,19 @@ namespace Org.Apache.REEF.Driver.Evaluator settings.Add(pair[0], pair[1]); } - // TODO[JIRA REEF-1054]: make runtimeName not optional - string runtimeName; - if (!settings.TryGetValue("RuntimeName", out runtimeName)) + string runtimeNameStr; + if (!settings.TryGetValue("RuntimeName", out runtimeNameStr)) { Exceptions.Throw(new ArgumentException("cannot find RuntimeName entry"), LOGGER); } + + RuntimeName runtimeName; + + if (!Enum.TryParse(runtimeNameStr, true, out runtimeName)) + { + Exceptions.Throw(new ArgumentException("cannot parse RuntimeName entry"), LOGGER); + } + string ipAddress; if (!settings.TryGetValue("IP", out ipAddress)) { @@ -145,7 +155,7 @@ namespace Org.Apache.REEF.Driver.Evaluator get { return _rack; } } - public string RuntimeName + public RuntimeName RuntimeName { get { return _runtimeName; } } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs index 91bae8c..1f1a70c 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs @@ -18,6 +18,8 @@ using System; using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Runtime; + namespace Org.Apache.REEF.Driver.Evaluator { /// <summary> @@ -27,37 +29,44 @@ namespace Org.Apache.REEF.Driver.Evaluator internal class EvaluatorRequest : IEvaluatorRequest { internal EvaluatorRequest() - : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N")) + : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) { } internal EvaluatorRequest(int number, int megaBytes) - : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N")) + : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core) - : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N")) + : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, string rack) - : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N")) + : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack) - : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N")) + : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId) + : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty) + + { + } + + internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName) { Number = number; MemoryMegaBytes = megaBytes; VirtualCore = core; Rack = rack; EvaluatorBatchId = evaluatorBatchId; + RuntimeName = runtimeName; } [DataMember] @@ -75,6 +84,9 @@ namespace Org.Apache.REEF.Driver.Evaluator [DataMember] public string EvaluatorBatchId { get; private set; } + [DataMember] + public string RuntimeName { get; private set; } + internal static EvaluatorRequestBuilder NewBuilder() { return new EvaluatorRequestBuilder(); http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs index f05db51..b59005b 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs @@ -17,12 +17,15 @@ using System; +using Org.Apache.REEF.Common.Runtime; + namespace Org.Apache.REEF.Driver.Evaluator { public sealed class EvaluatorRequestBuilder { private string _evaluatorBatchId; private string _rackName; + private string _runtimeName; [Obsolete("This constructor will be internal after 0.13.")] public EvaluatorRequestBuilder(IEvaluatorRequest request) @@ -32,6 +35,7 @@ namespace Org.Apache.REEF.Driver.Evaluator VirtualCore = request.VirtualCore; _evaluatorBatchId = request.EvaluatorBatchId; _rackName = request.Rack; + _runtimeName = request.RuntimeName; } internal EvaluatorRequestBuilder() @@ -41,6 +45,7 @@ namespace Org.Apache.REEF.Driver.Evaluator MegaBytes = 64; _rackName = string.Empty; _evaluatorBatchId = Guid.NewGuid().ToString("N"); + _runtimeName = string.Empty; } public int Number { get; private set; } @@ -103,13 +108,24 @@ namespace Org.Apache.REEF.Driver.Evaluator } /// <summary> + /// Sets the runtime name for requested evaluators in the same request. The batch of Evaluators requested in the + /// same request will have the same runtime name. + /// </summary> + /// <param name="runtimeName">The runtime name for the Evaluator request.</param> + public EvaluatorRequestBuilder SetRuntimeName(RuntimeName runtimeName) + { + _runtimeName = (runtimeName == RuntimeName.Default) ? string.Empty : runtimeName.ToString(); + return this; + } + + /// <summary> /// Build the EvaluatorRequest. /// </summary> /// <returns></returns> public IEvaluatorRequest Build() { #pragma warning disable 618 - return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId); + return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName); #pragma warning restore 618 } } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs index 254b66d..697d830 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs @@ -17,6 +17,7 @@ using Org.Apache.REEF.Common.Catalog; using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Runtime; namespace Org.Apache.REEF.Driver.Evaluator { @@ -53,6 +54,6 @@ namespace Org.Apache.REEF.Driver.Evaluator /// <summary> /// name of the runtime that allocated this evaluator /// </summary> - string RuntimeName { get; } + RuntimeName RuntimeName { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs index 2e537b4..5a51ad1 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Common.Runtime; + namespace Org.Apache.REEF.Driver.Evaluator { /// <summary> @@ -47,5 +49,10 @@ namespace Org.Apache.REEF.Driver.Evaluator /// will have the same Batch ID. /// </summary> string EvaluatorBatchId { get; } + + /// <summary> + /// The name of the runtime to allocate teh evaluator on + /// </summary> + string RuntimeName { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index d6c845a..7880292 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -30,6 +30,7 @@ under the License. </PropertyGroup> <Import Project="$(SolutionDir)\build.props" /> <ItemGroup> + <Reference Include="Microsoft.Hadoop.Avro, Version=1.5.6.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL" /> <Reference Include="protobuf-net"> <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> </Reference> @@ -38,6 +39,8 @@ under the License. <Reference Include="System.Runtime.Serialization" /> </ItemGroup> <ItemGroup> + <Compile Include="Bridge\Avro\DefinedRuntimes.cs" /> + <Compile Include="Bridge\Avro\DefinedRuntimesSerializer.cs" /> <Compile Include="Bridge\BridgeConfigurationProvider.cs" /> <Compile Include="Bridge\BridgeHandlerManager.cs" /> <Compile Include="Bridge\BridgeLogger.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Driver/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/packages.config b/lang/cs/Org.Apache.REEF.Driver/packages.config index 75512ca..086bedd 100644 --- a/lang/cs/Org.Apache.REEF.Driver/packages.config +++ b/lang/cs/Org.Apache.REEF.Driver/packages.config @@ -18,6 +18,8 @@ specific language governing permissions and limitations under the License. --> <packages> + <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" /> + <package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" /> <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" /> <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" /> </packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Evaluator.Tests/app.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/app.config b/lang/cs/Org.Apache.REEF.Evaluator.Tests/app.config new file mode 100644 index 0000000..39d3857 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/app.config @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<configuration> + <runtime> + <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1"> + <dependentAssembly> + <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-7.0.0.0" newVersion="7.0.0.0" /> + </dependentAssembly> + </assemblyBinding> + </runtime> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index aaa296d..7c8ef7c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -217,8 +217,9 @@ namespace Org.Apache.REEF.Tests.Functional lines = File.ReadAllLines(GetLogFile(fileName, subfolder, testFolder)); break; } - catch (Exception) + catch (Exception e) { + Console.Write(e.ToString()); Thread.Sleep(SleepTime); } } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs new file mode 100644 index 0000000..a110316 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Runtime; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Driver +{ + public sealed class EvaluatorRequestingDriverSpecifyingDefaultRuntimeName : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequestingDriver)); + + private readonly IEvaluatorRequestor _evaluatorRequestor; + + [Inject] + public EvaluatorRequestingDriverSpecifyingDefaultRuntimeName(IEvaluatorRequestor evaluatorRequestor) + { + _evaluatorRequestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "ondriver.start {0}", value.StartTime)); + var request = + _evaluatorRequestor.NewBuilder() + .SetNumber(1) + .SetMegabytes(512) + .SetCores(2) + .SetRackName("WonderlandRack") + .SetEvaluatorBatchId("TestEvaluator") + .SetRuntimeName(RuntimeName.Default) + .Build(); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "submitting evaluator request")); + _evaluatorRequestor.Submit(request); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "evaluator request submitted")); + } + + public void OnNext(IAllocatedEvaluator eval) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received evaluator. Runtime Name: {0}.", eval.GetEvaluatorDescriptor().RuntimeName)); + eval.Dispose(); + } + + public void OnNext(IRunningTask runningTask) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received runing task. Runtime Name: {0}", runningTask.ActiveContext.EvaluatorDescriptor.RuntimeName)); + } + + public void OnError(Exception error) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "On error: {0}", error)); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingInvalidRuntimeName.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingInvalidRuntimeName.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingInvalidRuntimeName.cs new file mode 100644 index 0000000..a7564db --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingInvalidRuntimeName.cs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Runtime; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Driver +{ + public sealed class EvaluatorRequestingDriverSpecifyingInvalidRuntimeName : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequestingDriver)); + + private readonly IEvaluatorRequestor _evaluatorRequestor; + + [Inject] + public EvaluatorRequestingDriverSpecifyingInvalidRuntimeName(IEvaluatorRequestor evaluatorRequestor) + { + _evaluatorRequestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "ondriver.start {0}", value.StartTime)); + var request = + _evaluatorRequestor.NewBuilder() + .SetNumber(1) + .SetMegabytes(512) + .SetCores(2) + .SetRackName("WonderlandRack") + .SetEvaluatorBatchId("TestEvaluator") + .SetRuntimeName(RuntimeName.Yarn) + .Build(); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "submitting evaluator request")); + _evaluatorRequestor.Submit(request); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "evaluator request submitted")); + } + + public void OnNext(IAllocatedEvaluator eval) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received evaluator. Runtime Name: {0}.", eval.GetEvaluatorDescriptor().RuntimeName)); + eval.Dispose(); + } + + public void OnNext(IRunningTask value) + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "On error: {0}", error)); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingRuntimeName.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingRuntimeName.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingRuntimeName.cs new file mode 100644 index 0000000..e90cee2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriverSpecifyingRuntimeName.cs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using System.Globalization; +using Org.Apache.REEF.Common.Runtime; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Driver +{ + public sealed class EvaluatorRequestingDriverSpecifyingRuntimeName : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequestingDriver)); + + private readonly IEvaluatorRequestor _evaluatorRequestor; + + [Inject] + public EvaluatorRequestingDriverSpecifyingRuntimeName(IEvaluatorRequestor evaluatorRequestor) + { + _evaluatorRequestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "ondriver.start {0}", value.StartTime)); + Debugger.Break(); + var request = + _evaluatorRequestor.NewBuilder() + .SetNumber(1) + .SetMegabytes(512) + .SetCores(2) + .SetRackName("WonderlandRack") + .SetEvaluatorBatchId("TestEvaluator") + .SetRuntimeName(RuntimeName.Local) + .Build(); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "submitting evaluator request")); + _evaluatorRequestor.Submit(request); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "evaluator request submitted")); + } + + public void OnNext(IAllocatedEvaluator eval) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received evaluator. Runtime Name: {0}.", eval.GetEvaluatorDescriptor().RuntimeName)); + eval.Dispose(); + } + + public void OnNext(IRunningTask runningTask) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received runing task. Runtime Name: {0}", runningTask.ActiveContext.EvaluatorDescriptor.RuntimeName)); + } + + public void OnError(Exception error) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "On error: {0}", error)); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs index c18e3b4..6e39c30 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs @@ -15,9 +15,13 @@ // specific language governing permissions and limitations // under the License. +using System; + using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -33,27 +37,75 @@ namespace Org.Apache.REEF.Tests.Functional.Driver public class RuntimeNameTest : ReefFunctionalTest { /// <summary> - /// This is to test DriverTestStartHandler. No evaluator and tasks are involved. + /// Validates that runtime name is propagated to c#. /// </summary> [Fact] [Trait("Priority", "1")] [Trait("Category", "FunctionalGated")] [Trait("Description", "Test TestRuntimeName. Validates that runtime name is propagated to c#")] //// TODO[JIRA REEF-1184]: add timeout 180 sec - public void TestRuntimeName() + public void TestRuntimeNameWithoutSpecifying() { string testFolder = DefaultRuntimeFolder + TestId; - TestRun(DriverConfigurationsWithEvaluatorRequest(), typeof(EvaluatorRequestingDriver), 1, "EvaluatorRequestingDriver", "local", testFolder); + TestRun(DriverConfigurationsWithEvaluatorRequest(GenericType<EvaluatorRequestingDriver>.Class), typeof(EvaluatorRequestingDriver), 1, "EvaluatorRequestingDriver", "local", testFolder); ValidateMessageSuccessfullyLoggedForDriver("Runtime Name: Local", testFolder, 2); CleanUp(testFolder); } - public IConfiguration DriverConfigurationsWithEvaluatorRequest() + /// <summary> + /// Validates that runtime name is propagated to c#. + /// </summary> + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test EvaluatorRequestingDriverSpecifyingRuntimeName. Validates that runtime name is propagated to c#, when specified during submission")] + //// TODO[JIRA REEF-1184]: add timeout 180 sec + public void TestRuntimeNameSpecifyingValidName() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurationsWithEvaluatorRequest(GenericType<EvaluatorRequestingDriverSpecifyingRuntimeName>.Class), typeof(EvaluatorRequestingDriverSpecifyingRuntimeName), 1, "EvaluatorRequestingDriverSpecifyingRuntimeName", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver("Runtime Name: Local", testFolder, 1); + } + + /// <summary> + /// Validates that runtime name is propagated to c#. + /// </summary> + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test TestRuntimeNameSpecifyingInvalidName. Validates that exception is thrown on c# side when invalid runtime name is specified")] + //// TODO[JIRA REEF-1184]: add timeout 180 sec + public void TestRuntimeNameSpecifyingInvalidName() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurationsWithEvaluatorRequest(GenericType<EvaluatorRequestingDriverSpecifyingInvalidRuntimeName>.Class), typeof(EvaluatorRequestingDriverSpecifyingInvalidRuntimeName), 1, "EvaluatorRequestingDriverSpecifyingInvalidRunitmeName", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver("System.ArgumentException: Requested runtime Yarn is not in the defined runtimes list Local", testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// Validates that runtime name is propagated to c#, when default name is specified. + /// </summary> + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test TestRuntimeNameSpecifyingDefaultName. Validates that runtime name is propagated to c#")] + //// TODO[JIRA REEF-1184]: add timeout 180 sec + public void TestRuntimeNameSpecifyingDefaultName() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurationsWithEvaluatorRequest(GenericType<EvaluatorRequestingDriverSpecifyingDefaultRuntimeName>.Class), typeof(EvaluatorRequestingDriverSpecifyingDefaultRuntimeName), 1, "EvaluatorRequestingDriverSpecifyingDefaultRuntimeName", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver("Runtime Name: Local", testFolder, 1); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurationsWithEvaluatorRequest<T>(GenericType<T> type) + where T : IObserver<IAllocatedEvaluator>, IObserver<IDriverStarted>, IObserver<IRunningTask> { IConfiguration driverConfig = DriverConfiguration.ConfigurationModule - .Set(DriverConfiguration.OnDriverStarted, GenericType<EvaluatorRequestingDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<EvaluatorRequestingDriver>.Class) - .Set(DriverConfiguration.OnTaskRunning, GenericType<EvaluatorRequestingDriver>.Class) + .Set(DriverConfiguration.OnDriverStarted, type) + .Set(DriverConfiguration.OnEvaluatorAllocated, type) + .Set(DriverConfiguration.OnTaskRunning, type) .Set(DriverConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) .Build(); http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 4826d54..0ee261f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -85,6 +85,9 @@ under the License. <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> + <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs" /> + <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingInvalidRuntimeName.cs" /> + <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingRuntimeName.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> <Compile Include="Functional\FaultTolerant\PoisonTest.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java index c2c5fd3..52449c8 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java @@ -31,6 +31,7 @@ import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; @@ -96,6 +97,7 @@ final class YarnBootstrapDriverConfigGenerator { .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString()) .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0) + .set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME) .build(); final AvroAppSubmissionParameters appSubmissionParams = yarnAppSubmissionParams.getSharedAppSubmissionParameters(); http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/pom.xml b/lang/java/reef-bridge-java/pom.xml index 941e463..eaa1206 100644 --- a/lang/java/reef-bridge-java/pom.xml +++ b/lang/java/reef-bridge-java/pom.xml @@ -70,6 +70,11 @@ under the License. <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.9</version> + </dependency> </dependencies> <build> @@ -86,6 +91,22 @@ under the License. </pluginManagement> <plugins> <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/avro/DefinedRuntimes.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/avro/DefinedRuntimes.avsc b/lang/java/reef-bridge-java/src/main/avro/DefinedRuntimes.avsc new file mode 100644 index 0000000..3e089af --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/avro/DefinedRuntimes.avsc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + [ +/* + * Defines the schema for the defined runtime names. This avro object is used to pass runtime names + * to the c# + */ + { + "namespace":"org.apache.reef.javabridge.avro", + "type":"record", + "name":"DefinedRuntimes", + "doc":"Defines the schema for the defined runtime names. This avro object is used to pass runtime names to the c#", + "fields":[ + { + "name":"runtimeNames", + "type":{"type":"array", "items":"string"}, + "doc":"defined runtime names" + } + ] + } +] http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java index a046090..0db4388 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java @@ -18,13 +18,19 @@ */ package org.apache.reef.javabridge; +import org.apache.commons.lang.StringUtils; import org.apache.reef.annotations.audience.Interop; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.evaluator.EvaluatorRequest; import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.javabridge.avro.DefinedRuntimes; +import org.apache.reef.javabridge.utils.DefinedRuntimesSerializer; import org.apache.reef.util.logging.LoggingScope; import org.apache.reef.util.logging.LoggingScopeFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,6 +46,7 @@ public final class EvaluatorRequestorBridge extends NativeBridge { private final boolean isBlocked; private final EvaluatorRequestor jevaluatorRequestor; private final LoggingScopeFactory loggingScopeFactory; + private final Set<String> definedRuntimes; // accumulate how many evaluators have been submitted through this instance // of EvaluatorRequestorBridge @@ -47,14 +54,20 @@ public final class EvaluatorRequestorBridge extends NativeBridge { public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked, - final LoggingScopeFactory loggingScopeFactory) { + final LoggingScopeFactory loggingScopeFactory, + final Set<String> definedRuntimes) { this.jevaluatorRequestor = evaluatorRequestor; this.clrEvaluatorsNumber = 0; this.isBlocked = isBlocked; this.loggingScopeFactory = loggingScopeFactory; + this.definedRuntimes = definedRuntimes; } - public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, final String rack) { + public void submit(final int evaluatorsNumber, + final int memory, + final int virtualCore, + final String rack, + final String runtimeName) { if (this.isBlocked) { throw new RuntimeException("Cannot request additional Evaluator, this is probably because " + "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); @@ -71,6 +84,7 @@ public final class EvaluatorRequestorBridge extends NativeBridge { .setNumber(evaluatorsNumber) .setMemory(memory) .setNumberOfCores(virtualCore) + .setRuntimeName(runtimeName) .build(); LOG.log(Level.FINE, "submitting evaluator request {0}", request); @@ -85,4 +99,20 @@ public final class EvaluatorRequestorBridge extends NativeBridge { @Override public void close() { } + + public byte[] getDefinedRuntimes(){ + if(LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Defined Runtimes :" + StringUtils.join(this.definedRuntimes, ',')); + } + + final DefinedRuntimes dr = new DefinedRuntimes(); + final List<CharSequence> runtimeNames = new ArrayList<>(); + for(final String name : this.definedRuntimes) { + runtimeNames.add(name); + } + dr.setRuntimeNames(runtimeNames); + final DefinedRuntimesSerializer drs = new DefinedRuntimesSerializer(); + final byte[] ret = drs.toBytes(dr); + return ret; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java index 0ddb201..89be5f3 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java @@ -26,6 +26,7 @@ import org.apache.reef.driver.evaluator.FailedEvaluator; import org.apache.reef.io.naming.Identifiable; import org.apache.reef.util.logging.LoggingScopeFactory; +import java.util.Set; import java.util.logging.Logger; /** @@ -44,11 +45,13 @@ public final class FailedEvaluatorBridge extends NativeBridge implements Identif final EvaluatorRequestor evaluatorRequestor, final boolean blockedForAdditionalEvaluator, final LoggingScopeFactory loggingScopeFactory, - final ActiveContextBridgeFactory activeContextBridgeFactory) { + final ActiveContextBridgeFactory activeContextBridgeFactory, + final Set<String> definedRuntimes) { this.jfailedEvaluator = failedEvaluator; this.evaluatorId = failedEvaluator.getId(); this.evaluatorRequestorBridge = - new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory); + new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory, + definedRuntimes); this.activeContextBridgeFactory = activeContextBridgeFactory; } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 5bb2c2b..c0d1ef5 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -30,7 +30,9 @@ import org.apache.reef.io.network.naming.NameServer; import org.apache.reef.javabridge.*; import org.apache.reef.driver.restart.DriverRestartCompleted; import org.apache.reef.runtime.common.driver.DriverStatusManager; +import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.util.Optional; import org.apache.reef.util.logging.CLRBufferedLogHandler; @@ -51,10 +53,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; @@ -118,6 +117,7 @@ public final class JobDriver { * Logging scope factory that provides LoggingScope. */ private final LoggingScopeFactory loggingScopeFactory; + private final Set<String> definedRuntimes; private BridgeHandlerManager handlerManager = null; private boolean isRestarted = false; @@ -149,7 +149,8 @@ public final class JobDriver { final ActiveContextBridgeFactory activeContextBridgeFactory, final REEFFileNames reefFileNames, final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, - final CLRProcessFactory clrProcessFactory) { + final CLRProcessFactory clrProcessFactory, + @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) { this.clock = clock; this.httpServer = httpServer; this.jobMessageObserver = jobMessageObserver; @@ -163,6 +164,7 @@ public final class JobDriver { this.reefFileNames = reefFileNames; this.localAddressProvider = localAddressProvider; this.clrProcessFactory = clrProcessFactory; + this.definedRuntimes = definedRuntimes; } private void setupBridge(final ClrHandlersInitializer initializer) { @@ -192,7 +194,8 @@ public final class JobDriver { } this.evaluatorRequestorBridge = - new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); + new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory, + JobDriver.this.definedRuntimes); JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); try (final LoggingScope lp = @@ -295,7 +298,7 @@ public final class JobDriver { LOG.log(Level.INFO, message); final FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, - JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory); + JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes); if (isRestartFailed) { NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/DefinedRuntimesSerializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/DefinedRuntimesSerializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/DefinedRuntimesSerializer.java new file mode 100644 index 0000000..02c5df7 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/DefinedRuntimesSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.javabridge.utils; + +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.reef.javabridge.avro.DefinedRuntimes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Serializer for MultiRuntimeDefinition. + */ +public final class DefinedRuntimesSerializer { + + private static final String CHARSET_NAME = "UTF-8"; + + /** + * Serializes DefinedRuntimes. + * @param definedRuntimes the Avro object to toString + * @return Serialized avro string + */ + public byte[] toBytes(final DefinedRuntimes definedRuntimes){ + final DatumWriter<DefinedRuntimes> configurationWriter = + new SpecificDatumWriter<>(DefinedRuntimes.class); + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + final BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null); + configurationWriter.write(definedRuntimes, binaryEncoder); + binaryEncoder.flush(); + out.flush(); + return out.toByteArray(); + } catch (final IOException e) { + throw new RuntimeException("Unable to serialize DefinedRuntimes", e); + } + } + + /** + * Deserializes avro definition. + * @param serializedDefinedRuntimes serialized definition + * @return Avro object + * @throws IOException + */ + public DefinedRuntimes fromBytes(final byte[] serializedDefinedRuntimes) throws + IOException{ + try(InputStream is = new ByteArrayInputStream(serializedDefinedRuntimes)) { + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(is, null); + final SpecificDatumReader<DefinedRuntimes> reader = new SpecificDatumReader<>(DefinedRuntimes.class); + final DefinedRuntimes rd = reader.read(null, decoder); + return rd; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/package-info.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/package-info.java new file mode 100644 index 0000000..3860ef0 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * utils for clr bridge. + */ +package org.apache.reef.javabridge.utils; http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/DefinedRuntimes.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/DefinedRuntimes.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/DefinedRuntimes.java new file mode 100644 index 0000000..ae6bcd3 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/DefinedRuntimes.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +import java.util.Set; + +/** + * The defined runtimes for the job driver. + */ +@NamedParameter(doc = "Set of defined runtimes.") +public final class DefinedRuntimes implements Name<Set<String>> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java index d3b10b1..2fb198f 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java @@ -25,6 +25,7 @@ import org.apache.reef.io.TempFileCreator; import org.apache.reef.io.WorkingDirectoryTempFileCreator; import org.apache.reef.runtime.common.driver.api.*; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; import org.apache.reef.runtime.common.files.RuntimePathProvider; @@ -66,6 +67,11 @@ public final class HDInsightDriverConfiguration extends ConfigurationModuleBuild public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>(); /** + * @see DefinedRuntimes + */ + public static final RequiredParameter<String> RUNTIME_NAMES = new RequiredParameter<>(); + + /** * The client remote identifier. */ public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>(); @@ -104,5 +110,6 @@ public final class HDInsightDriverConfiguration extends ConfigurationModuleBuild .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) .bindImplementation(RuntimeClasspathProvider.class, HDInsightClasspathProvider.class) .bindImplementation(RuntimePathProvider.class, HDInsightJVMPathProvider.class) + .bindSetEntry(DefinedRuntimes.class, RUNTIME_NAMES) .build(); } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java index 89658a3..c1fff73 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java @@ -20,6 +20,7 @@ package org.apache.reef.runtime.hdinsight.client; import org.apache.reef.runtime.common.client.DriverConfigurationProvider; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.hdinsight.driver.RuntimeIdentifier; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; @@ -28,8 +29,6 @@ import javax.inject.Inject; import java.net.URI; -import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*; - /** * Default driver configuration provider for HDInsight. */ @@ -52,6 +51,7 @@ final class HDInsightDriverConfigurationProviderImpl implements DriverConfigurat .set(HDInsightDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolder.toString()) .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack) + .set(HDInsightDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME) .build(); return Configurations.merge( http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/RuntimeIdentifier.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/RuntimeIdentifier.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/RuntimeIdentifier.java new file mode 100644 index 0000000..3edf31c --- /dev/null +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/RuntimeIdentifier.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.hdinsight.driver; + +import org.apache.reef.annotations.audience.Private; + +/** + * Runtime Identifier Implementation. + */ +@Private +public final class RuntimeIdentifier { + /** + * Same value is defined on the C# side in the Org.Apache.REEF.Common.Runtime.RuntimeName. + */ + public static final String RUNTIME_NAME = "HDInsight"; + + private RuntimeIdentifier() { } +} http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/package-info.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/package-info.java new file mode 100644 index 0000000..febeb5a --- /dev/null +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/driver/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * HDInsight support for REEF. + */ +package org.apache.reef.runtime.hdinsight.driver; http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java index 4001cfe..e3d19d8 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java @@ -23,6 +23,7 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; import org.apache.reef.runtime.local.client.parameters.RackNames; import org.apache.reef.runtime.local.driver.LocalDriverConfiguration; +import org.apache.reef.runtime.local.driver.RuntimeIdentifier; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; @@ -59,7 +60,8 @@ final class LocalDriverConfigurationProviderImpl implements DriverConfigurationP .set(LocalDriverConfiguration.ROOT_FOLDER, jobFolder.getPath()) .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack) .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) - .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId); + .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId) + .set(LocalDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME); for (final String rackName : rackNames) { configModule = configModule.set(LocalDriverConfiguration.RACK_NAMES, rackName); } http://git-wip-us.apache.org/repos/asf/reef/blob/88ddc3a4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java index 3236680..e27fef1 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java @@ -22,12 +22,14 @@ import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; import org.apache.reef.runtime.common.client.DriverConfigurationProvider; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; +import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.local.LocalClasspathProvider; import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; import org.apache.reef.runtime.local.client.parameters.RackNames; import org.apache.reef.runtime.local.client.parameters.RootFolder; +import org.apache.reef.runtime.local.driver.RuntimeIdentifier; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.formats.*; @@ -85,6 +87,7 @@ public class LocalRuntimeConfiguration extends ConfigurationModuleBuilder { .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) .bindSetEntry(RackNames.class, RACK_NAMES) + .bindSetEntry(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME) .build();
