Repository: reef Updated Branches: refs/heads/master 765d7f616 -> 55cde7d62
[REEF-1430] Validate Task Message Send failure => FailedEvaluator Event This change: * updates HeartBeatManager.OnNext(Alarm) to properly handle exception thrown during getting or sending evaluator heartbeat. * adds test to verify that task message send failure in heartbeat causes FailedEvaluator event on driver. JIRA: [REEF-1430](https://issues.apache.org/jira/browse/REEF-1430) Pull request: This closes #1202 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/55cde7d6 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/55cde7d6 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/55cde7d6 Branch: refs/heads/master Commit: 55cde7d623ae2096248fea58e84ac211a04de2ad Parents: 765d7f6 Author: Mariia Mykhailova <[email protected]> Authored: Fri Jul 8 16:50:24 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Mon Dec 12 17:57:57 2016 -0800 ---------------------------------------------------------------------- .../Runtime/Evaluator/EvaluatorRuntime.cs | 2 +- .../Runtime/Evaluator/HeartBeatManager.cs | 15 +- .../User/SendTaskMessageExceptionTest.cs | 165 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + 4 files changed, 179 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/55cde7d6/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 0144573..0d26a64 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -257,7 +257,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } - private void OnException(Exception e) + internal void OnException(Exception e) { lock (_heartBeatManager) { http://git-wip-us.apache.org/repos/asf/reef/blob/55cde7d6/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index 889c67c..5800081 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -287,9 +287,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && EvaluatorRuntime.State == State.RUNNING) { - EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); - Send(evaluatorHeartbeatProto); + try + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Verbose, + string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus)); + Send(evaluatorHeartbeatProto); + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + EvaluatorRuntime.OnException(e); + } } else { http://git-wip-us.apache.org/repos/asf/reef/blob/55cde7d6/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs new file mode 100644 index 0000000..8d6bf6e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; +using Org.Apache.REEF.Tests.Functional.Common; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This class contains a test that tests the behavior upon throwing an Exception when + /// sending a Context Message from the Evaluator's IContextMessageSource. + /// </summary> + [Collection("FunctionalTests")] + public sealed class SendTaskMessageExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(SendTaskMessageExceptionTest)); + + private static readonly string TaskId = "TaskId"; + private static readonly string ExpectedExceptionMessage = "ExpectedExceptionMessage"; + private static readonly string ReceivedFailedEvaluator = "ReceivedFailedEvaluator"; + + /// <summary> + /// This test validates that a failure in the ITaskMessageSource results in a FailedEvaluator. + /// </summary> + [Fact] + public void TestSendTaskMessageException() + { + string testFolder = DefaultRuntimeFolder + TestId; + + TestRun(DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TestSendTaskMessageExceptionDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TestSendTaskMessageExceptionDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TestSendTaskMessageExceptionDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<TestSendTaskMessageExceptionDriver>.Class) + .Build(), + typeof(TestSendTaskMessageExceptionDriver), + 1, + "SendTaskMessageExceptionTest", + "local", + testFolder); + + ValidateSuccessForLocalRuntime(0, 0, 1, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(ReceivedFailedEvaluator, testFolder); + CleanUp(testFolder); + } + + private sealed class TestSendTaskMessageExceptionDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IFailedContext>, + IObserver<IFailedEvaluator> + { + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TestSendTaskMessageExceptionDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitTask( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<TestTask>.Class) + .Set(TaskConfiguration.OnSendMessage, GenericType<SendTaskMessageExceptionHandler>.Class) + .Build()); + } + + /// <summary> + /// Throwing an Exception in a task message handler will result in a Failed Evaluator. + /// </summary> + public void OnNext(IFailedEvaluator value) + { + Assert.Equal(1, value.FailedContexts.Count); + Assert.NotNull(value.EvaluatorException.InnerException); + Assert.True(value.EvaluatorException.InnerException is TestSerializableException, + "Unexpected type of evaluator exception: " + value.EvaluatorException.InnerException.GetType()); + Assert.Equal(ExpectedExceptionMessage, value.EvaluatorException.InnerException.Message); + + Logger.Log(Level.Info, ReceivedFailedEvaluator); + } + + public void OnNext(IFailedContext value) + { + throw new Exception("The Driver does not expect a Failed Context message."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// A Context message source that throws an Exception. + /// </summary> + private sealed class SendTaskMessageExceptionHandler : ITaskMessageSource + { + private int counter; + + [Inject] + private SendTaskMessageExceptionHandler() + { + } + + public Optional<TaskMessage> Message + { + get + { + counter++; + if (counter == 2) + { + throw new TestSerializableException(ExpectedExceptionMessage); + } + return Optional<TaskMessage>.Empty(); + } + } + } + + private sealed class TestTask : WaitingTask + { + [Inject] + private TestTask(EventMonitor eventMonitor) : base(eventMonitor, "WaitingTask started") + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/55cde7d6/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 0f57571..d483c7f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -87,6 +87,7 @@ under the License. <Compile Include="Functional\Common\Task\NullTask.cs" /> <Compile Include="Functional\Failure\User\ContextStopExceptionTest.cs" /> <Compile Include="Functional\Common\EventMonitor.cs" /> + <Compile Include="Functional\Failure\User\SendTaskMessageExceptionTest.cs" /> <Compile Include="Functional\Failure\User\SendContextMessageExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" />
