Repository: reef Updated Branches: refs/heads/master d34441ce6 -> 02bfc2f4f
[REEF-1258] Populate Task Exception data properly This addressed the issue by * Adding support for Exception serialization. * Filling in FailedTask.AsError and FailedTask.Cause. * Adding tests for Exception serialization. JIRA: [REEF-1258](https://issues.apache.org/jira/browse/REEF-1258) Pull Request: This closes #971 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/02bfc2f4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/02bfc2f4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/02bfc2f4 Branch: refs/heads/master Commit: 02bfc2f4f6cdae4069af1b056a2cf50b6ce03b36 Parents: d34441c Author: Andrew Chung <[email protected]> Authored: Tue Apr 26 15:19:36 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Apr 28 13:49:01 2016 -0700 ---------------------------------------------------------------------- .../Exceptions/NonSerializableTaskException.cs | 40 ++++++ .../Org.Apache.REEF.Common.csproj | 1 + .../Runtime/Evaluator/Task/TaskRuntime.cs | 10 +- .../Runtime/Evaluator/Task/TaskStatus.cs | 29 ++++- .../Bridge/Events/FailedTask.cs | 39 +++++- .../Org.Apache.REEF.Driver.csproj | 1 + .../Task/JavaTaskException.cs | 32 +++++ .../Bridge/TestFailedTaskEventHandler.cs | 128 +++++++++++++++++-- 8 files changed, 255 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs new file mode 100644 index 0000000..76543b4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Common.Exceptions +{ + /// <summary> + /// Encapsulates <see cref="Exception#ToString"/> for an Exception from a + /// REEF Task that was not Serializable to the Job Driver. + /// </summary> + [Serializable] + public sealed class NonSerializableTaskException : Exception + { + public NonSerializableTaskException(string message, SerializationException serializationException) + : base(message, serializationException) + { + } + + public NonSerializableTaskException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 6cf177c..b754583 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -107,6 +107,7 @@ under the License. <Compile Include="Events\IContextStart.cs" /> <Compile Include="Events\IContextStop.cs" /> <Compile Include="Exceptions\JobException.cs" /> + <Compile Include="Exceptions\NonSerializableTaskException.cs" /> <Compile Include="Files\PathUtilities.cs" /> <Compile Include="IContextAndTaskSubmittable.cs" /> <Compile Include="IContextSubmittable.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 010c0f9..970a36c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -125,9 +125,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task if (Logger.CustomLevel >= resultLogLevel && result != null && result.Length > 0) { - Logger.Log(resultLogLevel, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + Logger.Log(resultLogLevel, + "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); } } + catch (Exception) + { + // TODO[JIRA REEF-1364]: Properly handle Exceptions and send a message to the Driver. + Logger.Log(Level.Error, "Received uncaught System Exception, force shutting down the Evaluator."); + + Environment.Exit(1); + } finally { if (_userTask != null) http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 909fc78..f59184c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -18,8 +18,12 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.IO; +using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Exceptions; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; @@ -33,6 +37,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); private readonly object _stateLock = new object(); + private readonly BinaryFormatter _binaryFormatter = new BinaryFormatter(); + private readonly TaskLifeCycle _taskLifeCycle; private readonly IHeartBeatManager _heartBeatManager; private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; @@ -246,12 +252,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } else if (_lastException.IsPresent()) { + byte[] error; + try + { + error = SerializeException(_lastException.Value); + } + catch (SerializationException se) + { + error = SerializeException(new NonSerializableTaskException(_lastException.Value.ToString(), se)); + } + var avroFailedTask = new AvroFailedTask { identifier = _taskId, - - // TODO[JIRA REEF-1258]: Serialize Exception properly. - cause = new byte[0], + cause = error, data = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()), message = _lastException.Value.Message }; @@ -274,6 +288,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } } + private byte[] SerializeException(Exception ex) + { + using (var memStream = new MemoryStream()) + { + _binaryFormatter.Serialize(memStream, ex); + return memStream.ToArray(); + } + } + private static bool IsLegalStateTransition(TaskState? from, TaskState to) { if (from == null) http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs index 56b6a17..31d9ca9 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs @@ -16,19 +16,25 @@ // under the License. using System; +using System.IO; using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; using Org.Apache.REEF.Common.Avro; +using Org.Apache.REEF.Common.Exceptions; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Driver.Bridge.Events { internal sealed class FailedTask : IFailedTask { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(FailedTask)); + private static readonly Logger Logger = Logger.GetLogger(typeof(FailedTask)); + + private readonly Exception _cause; public FailedTask(IFailedTaskClr2Java failedTaskClr2Java) { @@ -38,9 +44,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Events Id = avroFailedTask.identifier; Data = Optional<byte[]>.OfNullable(avroFailedTask.data); Message = avroFailedTask.message ?? "No message in Failed Task."; - - // TODO[JIRA REEF-1258]: Fill this in with avroFailedTask.cause. - Cause = Optional<Exception>.Empty(); + _cause = GetCause(avroFailedTask.cause, ByteUtilities.ByteArraysToString(avroFailedTask.data)); // This is always empty, even in Java. Description = Optional<string>.Empty(); @@ -56,8 +60,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public Optional<string> Description { get; set; } - public Optional<Exception> Cause { get; set; } - public Optional<byte[]> Data { get; set; } [DataMember] @@ -87,7 +89,30 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public Exception AsError() { - throw new NotImplementedException(); + return _cause; + } + + private static Exception GetCause(byte[] serializedCause, string taskExceptionString) + { + if (serializedCause == null) + { + return new JavaTaskException("Task failed with Exception generated by the Java Driver, please inspect the message."); + } + + try + { + using (var memStream = new MemoryStream(serializedCause)) + { + return (Exception)new BinaryFormatter().Deserialize(memStream); + } + } + catch (SerializationException se) + { + Exceptions.Caught(se, Level.Info, + "Exception from Task was not able to be deserialized, returning a NonSerializableException.", Logger); + + return new NonSerializableTaskException(taskExceptionString, se); + } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index cb40a47..eebca2c 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -138,6 +138,7 @@ under the License. <Compile Include="Evaluator\IEvaluatorRequest.cs" /> <Compile Include="Evaluator\IEvaluatorRequestor.cs" /> <Compile Include="Evaluator\IFailedEvaluator.cs" /> + <Compile Include="Task\JavaTaskException.cs" /> <Compile Include="IDriver.cs" /> <Compile Include="IDriverRestarted.cs" /> <Compile Include="IDriverStarted.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Driver/Task/JavaTaskException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/JavaTaskException.cs b/lang/cs/Org.Apache.REEF.Driver/Task/JavaTaskException.cs new file mode 100644 index 0000000..2e44413 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Task/JavaTaskException.cs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Driver.Task +{ + /// <summary> + /// A Task Exception from the Java side. Generally not expected. + /// </summary> + public sealed class JavaTaskException : Exception + { + internal JavaTaskException(string message) + : base(message) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/02bfc2f4/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs index 4e64f5a..dfcb9f6 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs @@ -16,12 +16,15 @@ // under the License. using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Exceptions; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; @@ -33,19 +36,20 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge [Collection("FunctionalTests")] public sealed class TestFailedTaskEventHandler : ReefFunctionalTest { - private const string FailedTaskMessage = "I have successfully seen a failed task."; + private const string FailedTaskMessage = "I have successfully seen all failed tasks."; private const string ExpectedExceptionMessage = "Expected exception."; + private const int NumFailedTasksExpected = 2; [Fact] [Trait("Priority", "1")] [Trait("Category", "FunctionalGated")] - [Trait("Description", "Test invocation of FailedTaskHandler")] + [Trait("Description", "Test invocation of FailedTaskHandler. Validates the Task ID of the failure, as well as the Exceptions of the Task failure.")] //// TODO[JIRA REEF-1184]: add timeout 180 sec public void TestFailedTaskEventHandlerOnLocalRuntime() { string testFolder = DefaultRuntimeFolder + TestId; TestRun(DriverConfigurations(), typeof(FailedTaskDriver), 1, "failedTaskTest", "local", testFolder); - ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: 1, testFolder: testFolder); + ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: NumFailedTasksExpected, testFolder: testFolder); ValidateMessageSuccessfullyLoggedForDriver(FailedTaskMessage, testFolder); CleanUp(testFolder); } @@ -63,10 +67,14 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge IObserver<IFailedTask>, IObserver<ICompletedTask> { private const string TaskId = "1234567"; + private static readonly Logger Logger = Logger.GetLogger(typeof(FailedTaskDriver)); private readonly IEvaluatorRequestor _requestor; + private bool _shouldReceiveSerializableException = false; + private int _numFailedTasksReceived = 0; + [Inject] private FailedTaskDriver(IEvaluatorRequestor requestor) { @@ -80,26 +88,65 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void OnNext(IAllocatedEvaluator value) { - value.SubmitTask(TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, TaskId) - .Set(TaskConfiguration.Task, GenericType<FailTask>.Class) - .Build()); + value.SubmitTask(GetTaskConfiguration()); } public void OnNext(IFailedTask value) { - Logger.Log(Level.Error, FailedTaskMessage); + _numFailedTasksReceived++; + + if (value.Id != TaskId) + { + throw new Exception("Received Task ID " + value.Id + " instead of the expected Task ID " + TaskId); + } + if (value.Message == null || value.Message != ExpectedExceptionMessage) { throw new Exception("Exception message not properly propagated. Received message " + value.Message); } - if (value.Id != TaskId) + if (_shouldReceiveSerializableException) { - throw new Exception("Received Task ID " + value.Id + " instead of the expected Task ID " + TaskId); + if (_numFailedTasksReceived == NumFailedTasksExpected) + { + Logger.Log(Level.Error, FailedTaskMessage); + } + + if (value.AsError() == null || !(value.AsError() is TestSerializableException)) + { + throw new Exception("Exception should have been serialized properly."); + } + + if (value.AsError().Message != ExpectedExceptionMessage) + { + throw new Exception("Incorrect Exception message, got message: " + value.AsError().Message); + } + + value.GetActiveContext().Value.Dispose(); + } + else + { + var taskException = value.AsError(); + if (taskException == null) + { + throw new Exception("Expected a non-null task exception."); + } + + var nonSerializableTaskException = taskException as NonSerializableTaskException; + if (nonSerializableTaskException == null) + { + throw new Exception( + "Expected a NonSerializableTaskException from Task, instead got Exception of type " + taskException.GetType()); + } + + if (!(nonSerializableTaskException.InnerException is SerializationException)) + { + throw new Exception("Expected a SerializationException as the inner Exception of the Task Exception."); + } + + _shouldReceiveSerializableException = true; + value.GetActiveContext().Value.SubmitTask(GetTaskConfiguration()); } - - value.GetActiveContext().Value.Dispose(); } public void OnNext(ICompletedTask value) @@ -116,13 +163,31 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { throw new NotImplementedException(); } + + private IConfiguration GetTaskConfiguration() + { + var shouldThrowSerializableConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<ShouldThrowSerializableException, bool>( + GenericType<ShouldThrowSerializableException>.Class, _shouldReceiveSerializableException.ToString()) + .Build(); + + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<FailTask>.Class) + .Build(); + + return Configurations.Merge(shouldThrowSerializableConfig, taskConfig); + } } private sealed class FailTask : ITask { + private readonly bool _shouldThrowSerializableException; + [Inject] - private FailTask() + private FailTask([Parameter(typeof(ShouldThrowSerializableException))] bool shouldThrowSerializableException) { + _shouldThrowSerializableException = shouldThrowSerializableException; } public void Dispose() @@ -131,7 +196,42 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public byte[] Call(byte[] memento) { - throw new Exception(ExpectedExceptionMessage); + if (_shouldThrowSerializableException) + { + throw new TestSerializableException(ExpectedExceptionMessage); + } + + throw new TestNonSerializableException(ExpectedExceptionMessage); + } + } + + [NamedParameter(documentation: "Used to indicate whether FailTask should throw a Serializable or non-Serializable Exception.")] + private sealed class ShouldThrowSerializableException : Name<bool> + { + private ShouldThrowSerializableException() + { + } + } + + [Serializable] + private sealed class TestSerializableException : Exception + { + public TestSerializableException(string message) + : base(message) + { + } + + public TestSerializableException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } + + private sealed class TestNonSerializableException : Exception + { + public TestNonSerializableException(string message) + : base(message) + { } } }
