Repository: reef Updated Branches: refs/heads/master ca358730c -> 9c9afc643
[REEF-1043] Add runtime name to evaluator descriptor c# This issue addressed by * Adding runtime name to the interface * Parsing runtime name from the serialized descriptor received from teh bridge JIRA: [REEF-983](https://issues.apache.org/jira/browse/REEF-1043) Pull Request: Closes #713 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9c9afc64 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9c9afc64 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9c9afc64 Branch: refs/heads/master Commit: 9c9afc6438eb8dde643abc4f1fddaa5a73987c86 Parents: ca35873 Author: Boris Shulman <[email protected]> Authored: Mon Dec 7 16:04:50 2015 -0800 Committer: Andrew Chung <[email protected]> Committed: Tue Dec 8 14:26:23 2015 -0800 ---------------------------------------------------------------------- .../Evaluator/EvaluatorDescriptorImpl.cs | 19 +++- .../Evaluator/IEvaluatorDescriptor.cs | 5 + .../Functional/ReefFunctionalTest.cs | 29 +++++- .../RuntimeName/EvaluatorRequestingDriver.cs | 99 ++++++++++++++++++++ .../Functional/RuntimeName/RuntimeNameTask.cs | 51 ++++++++++ .../Functional/RuntimeName/RuntimeNameTest.cs | 83 ++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 5 +- 7 files changed, 284 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs index 9d86cb3..07dfb17 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorDescriptorImpl.cs @@ -39,15 +39,17 @@ namespace Org.Apache.REEF.Driver.Evaluator private readonly int _megaBytes; private readonly INodeDescriptor _nodeDescriptor; private readonly string _rack; + private readonly string _runtimeName; - internal EvaluatorDescriptorImpl(INodeDescriptor nodeDescriptor, EvaluatorType type, int megaBytes, int core, - string rack = DefaultRackName) + // TODO[JIRA REEF-1054]: make runtimeName not optional + internal EvaluatorDescriptorImpl(INodeDescriptor nodeDescriptor, EvaluatorType type, int megaBytes, int core, string rack = DefaultRackName, string runtimeName = "") { _nodeDescriptor = nodeDescriptor; _evaluatorType = type; _megaBytes = megaBytes; _core = core; _rack = rack; + _runtimeName = runtimeName; } /// <summary> @@ -68,6 +70,13 @@ namespace Org.Apache.REEF.Driver.Evaluator } settings.Add(pair[0], pair[1]); } + + // TODO[JIRA REEF-1054]: make runtimeName not optional + string runtimeName; + if (!settings.TryGetValue("RuntimeName", out runtimeName)) + { + Exceptions.Throw(new ArgumentException("cannot find RuntimeName entry"), LOGGER); + } string ipAddress; if (!settings.TryGetValue("IP", out ipAddress)) { @@ -108,6 +117,7 @@ namespace Org.Apache.REEF.Driver.Evaluator _evaluatorType = EvaluatorType.CLR; _megaBytes = memoryInMegaBytes; _core = vCore; + _runtimeName = runtimeName; } public INodeDescriptor NodeDescriptor @@ -135,6 +145,11 @@ namespace Org.Apache.REEF.Driver.Evaluator get { return _rack; } } + public string RuntimeName + { + get { return _runtimeName; } + } + public override bool Equals(object obj) { var other = obj as EvaluatorDescriptorImpl; http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs index 28fffe7..7ef9004 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorDescriptor.cs @@ -51,5 +51,10 @@ namespace Org.Apache.REEF.Driver.Evaluator /// rack on which the evaluator was allocated /// </summary> string Rack { get; } + + /// <summary> + /// name of the runtime that allocated this evaluator + /// </summary> + string RuntimeName { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index d466c4b..a2db605 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -171,11 +171,32 @@ namespace Org.Apache.REEF.Tests.Functional } } - protected void ValidateMessageSuccessfullyLogged(string message, string testFolder) + protected void ValidateMessageSuccessfullyLogged(string message, string testFolder, int numberOfoccurances = 1) { - string[] lines = File.ReadAllLines(GetLogFile(_stdout, testFolder)); - string[] successIndicators = lines.Where(s => s.Contains(message)).ToArray(); - Assert.IsTrue(successIndicators.Any()); + string[] lines = null; + for (int i = 0; i < 60; i++) + { + try + { + lines = File.ReadAllLines(GetLogFile(_stdout, testFolder)); + break; + } + catch (Exception) + { + Thread.Sleep(SleepTime); + } + } + + if (lines != null) + { + string[] successIndicators = lines.Where(s => s.Contains(message)).ToArray(); + Assert.AreEqual(numberOfoccurances, successIndicators.Count()); + } + else + { + Console.WriteLine("Cannot read from log file"); + Assert.Fail(); + } } protected void PeriodicUploadLog(object source, ElapsedEventArgs e) http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriver.cs new file mode 100644 index 0000000..9d6dd99 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/EvaluatorRequestingDriver.cs @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; + +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Messaging; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Driver +{ + public sealed class EvaluatorRequestingDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequestingDriver)); + + private readonly IEvaluatorRequestor _evaluatorRequestor; + + [Inject] + public EvaluatorRequestingDriver(IEvaluatorRequestor evaluatorRequestor) + { + _evaluatorRequestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "ondriver.start {0}", value.StartTime)); + var request = + _evaluatorRequestor.NewBuilder() + .SetNumber(1) + .SetMegabytes(512) + .SetCores(2) + .SetRackName("WonderlandRack") + .SetEvaluatorBatchId("TestEvaluator") + .Build(); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "submitting evaluator request")); + _evaluatorRequestor.Submit(request); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "evaluator request submitted")); + } + + public void OnNext(IAllocatedEvaluator eval) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received evaluator. Runtime Name: {0}.", eval.GetEvaluatorDescriptor().RuntimeName)); + string taskId = "Task_" + eval.Id; + + IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, taskId) + .Build(); + + IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, taskId) + .Set(TaskConfiguration.Task, GenericType<RuntimeNameTask>.Class) + .Build(); + + eval.SubmitContextAndTask(contextConfiguration, taskConfiguration); + } + + public void OnNext(IRunningTask runningTask) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received runing task. Runtime Name: {0}", runningTask.ActiveContext.EvaluatorDescriptor.RuntimeName)); + } + + public void OnError(Exception error) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "On error: {0}", error)); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTask.cs new file mode 100644 index 0000000..09addf0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTask.cs @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Messaging +{ + public sealed class RuntimeNameTask : ITask + { + public const string MessageSend = "MESSAGE:TASK"; + + private static readonly Logger Logger = Logger.GetLogger(typeof(RuntimeNameTask)); + + [Inject] + public RuntimeNameTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello, CLR Task!"); + Thread.Sleep(5 * 1000); + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Msg disposed."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs new file mode 100644 index 0000000..c756648 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Messaging; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Driver +{ + [TestClass] + public class RuntimeNameTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + /// <summary> + /// This is to test DriverTestStartHandler. No evaluator and tasks are involved. + /// </summary> + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test TestRuntimeName. Validates that runtime name is propagated to c#")] + [DeploymentItem(@".")] + [Timeout(180 * 1000)] + public void TestRuntimeName() + { + string testFolder = DefaultRuntimeFolder + TestNumber++; + CleanUp(testFolder); + TestRun(DriverConfigurationsWithEvaluatorRequest(), typeof(EvaluatorRequestingDriver), 1, "EvaluatorRequestingDriver", "local", testFolder); + ValidateMessageSuccessfullyLogged("Runtime Name: Local", testFolder, 2); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurationsWithEvaluatorRequest() + { + IConfiguration driverConfig = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<EvaluatorRequestingDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<EvaluatorRequestingDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<EvaluatorRequestingDriver>.Class) + .Set(DriverConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) + .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build(); + + IConfiguration taskConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(RuntimeNameTask).Assembly.GetName().Name) + .BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, string>(typeof(NameClient).Assembly.GetName().Name) + .Build(); + + return Configurations.Merge(driverConfig, taskConfig); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/9c9afc64/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index b79f150..5bcb19b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -97,6 +97,7 @@ under the License. <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" /> <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> + <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> <Compile Include="Functional\Messaging\MessageDriver.cs" /> <Compile Include="Functional\Messaging\MessageTask.cs" /> @@ -106,6 +107,8 @@ under the License. <Compile Include="Functional\Group\PipelinedBroadcastReduceTest.cs" /> <Compile Include="Functional\Group\ScatterReduceTest.cs" /> <Compile Include="Functional\ReefFunctionalTest.cs" /> + <Compile Include="Functional\RuntimeName\RuntimeNameTask.cs" /> + <Compile Include="Functional\RuntimeName\RuntimeNameTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utility\TestDriverConfigGenerator.cs" /> <Compile Include="Utility\TestExceptions.cs" /> @@ -189,4 +192,4 @@ under the License. <Target Name="BeforeBuild"> </Target> --> -</Project> +</Project> \ No newline at end of file
