Repository: reef Updated Branches: refs/heads/master dbd628a31 -> 1a2f120c9
[REEF-1410] Validate Task constructor failure => FailedTask Event This addressed the issue by * Fixing heartbeat failure if task fails to start. * Adding a test to verify Task failure message. JIRA: [REEF-1410](https://issues.apache.org/jira/browse/REEF-1410) Pull Request: This closes #1019 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1a2f120c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1a2f120c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1a2f120c Branch: refs/heads/master Commit: 1a2f120c975bc5ff5c4e10eb21e3b86d78f7bc58 Parents: dbd628a Author: Andrew Chung <[email protected]> Authored: Tue May 31 14:00:42 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Sat Jun 4 07:58:25 2016 -0700 ---------------------------------------------------------------------- .../NonSerializableEvaluatorException.cs | 26 ++- .../Exceptions/NonSerializableTaskException.cs | 26 ++- .../Runtime/Evaluator/Context/ContextManager.cs | 31 +++- .../Runtime/Evaluator/Context/ContextRuntime.cs | 26 ++- .../Runtime/Evaluator/EvaluatorRuntime.cs | 20 +-- .../Evaluator/Task/TaskClientCodeException.cs | 78 ++++++++- .../Runtime/Evaluator/Task/TaskRuntime.cs | 11 +- .../Runtime/Evaluator/Task/TaskStatus.cs | 18 +- .../Bridge/Events/FailedEvaluator.cs | 19 +- .../Bridge/Events/FailedTask.cs | 9 +- .../ContextRuntimeTests.cs | 10 +- .../EvaluatorServiceTests.cs | 2 +- .../TaskRuntimeTests.cs | 16 +- .../User/TaskConstructorExceptionTest.cs | 175 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../Org.Apache.REEF.Utilities/ByteUtilities.cs | 40 ++++- 16 files changed, 420 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableEvaluatorException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableEvaluatorException.cs b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableEvaluatorException.cs index 8ce0913..4dbf59a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableEvaluatorException.cs +++ b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableEvaluatorException.cs @@ -27,14 +27,34 @@ namespace Org.Apache.REEF.Common.Exceptions [Serializable] public sealed class NonSerializableEvaluatorException : Exception { - public NonSerializableEvaluatorException(string message, SerializationException serializationException) - : base(message, serializationException) + internal static NonSerializableEvaluatorException UnableToSerialize(Exception originalException, SerializationException serializationException) { + return new NonSerializableEvaluatorException(originalException, serializationException); } - public NonSerializableEvaluatorException(SerializationInfo info, StreamingContext context) + internal static NonSerializableEvaluatorException UnableToDeserialize(string exceptionString, SerializationException serializationException) + { + return new NonSerializableEvaluatorException(exceptionString, serializationException); + } + + private NonSerializableEvaluatorException(Exception originalException, SerializationException serializationException) + : base(GetNonSerializableExceptionMessage(originalException), serializationException) + { + } + + private NonSerializableEvaluatorException(string exceptionString, SerializationException serializationException) + : base(exceptionString, serializationException) + { + } + + private NonSerializableEvaluatorException(SerializationInfo info, StreamingContext context) : base(info, context) { } + + private static string GetNonSerializableExceptionMessage(Exception e) + { + return string.Format("Unable to serialize the original Evaluator Exception. Original Exception.ToString(): {0}", e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs index 76543b4..41a29c2 100644 --- a/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs +++ b/lang/cs/Org.Apache.REEF.Common/Exceptions/NonSerializableTaskException.cs @@ -27,14 +27,34 @@ namespace Org.Apache.REEF.Common.Exceptions [Serializable] public sealed class NonSerializableTaskException : Exception { - public NonSerializableTaskException(string message, SerializationException serializationException) - : base(message, serializationException) + internal static NonSerializableTaskException UnableToSerialize(Exception originalException, SerializationException serializationException) { + return new NonSerializableTaskException(originalException, serializationException); } - public NonSerializableTaskException(SerializationInfo info, StreamingContext context) + internal static NonSerializableTaskException UnableToDeserialize(string exceptionString, SerializationException serializationException) + { + return new NonSerializableTaskException(exceptionString, serializationException); + } + + private NonSerializableTaskException(Exception originalException, SerializationException serializationException) + : base(GetNonSerializableExceptionMessage(originalException), serializationException) + { + } + + private NonSerializableTaskException(string exceptionString, SerializationException serializationException) + : base(exceptionString, serializationException) + { + } + + private NonSerializableTaskException(SerializationInfo info, StreamingContext context) : base(info, context) { } + + private static string GetNonSerializableExceptionMessage(Exception e) + { + return string.Format("Unable to serialize the original Task Exception. Original Exception.ToString(): {0}", e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 1d3fed2..3f7c5ae 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -20,6 +20,8 @@ using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; using System.Linq; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Tasks; @@ -65,7 +67,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context LOGGER.Log(Level.Info, "Launching the initial Task"); try { - _topContext.StartTask(_rootContextLauncher.RootTaskConfig.Value); + _topContext.StartTaskOnNewThread(_rootContextLauncher.RootTaskConfig.Value); } catch (TaskClientCodeException e) { @@ -338,7 +340,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } var configuration = _serializer.FromString(startTaskProto.configuration); - currentActiveContext.StartTask(configuration); + currentActiveContext.StartTaskOnNewThread(configuration); } } @@ -349,14 +351,33 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context private void HandleTaskException(TaskClientCodeException e) { LOGGER.Log(Level.Error, "TaskClientCodeException", e); - byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); - TaskStatusProto taskStatus = new TaskStatusProto() + byte[] error; + try + { + error = ByteUtilities.SerializeToBinaryFormat(e); + } + catch (SerializationException se) + { + error = ByteUtilities.SerializeToBinaryFormat( + TaskClientCodeException.CreateWithNonSerializableInnerException(e, se)); + } + + var avroFailedTask = new AvroFailedTask + { + identifier = e.TaskId, + cause = error, + data = ByteUtilities.StringToByteArrays(e.ToString()), + message = e.Message + }; + + var taskStatus = new TaskStatusProto { context_id = e.ContextId, task_id = e.TaskId, - result = exception, + result = AvroJsonSerializer<AvroFailedTask>.ToBytes(avroFailedTask), state = State.FAILED }; + LOGGER.Log(Level.Error, "Sending Heartbeat for a failed task: {0}", taskStatus); _heartBeatManager.OnNext(taskStatus); } http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 4d785b6..740bb8d 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -27,6 +27,7 @@ using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; @@ -267,7 +268,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// Launches an Task on this context. /// </summary> /// <param name="taskConfiguration"></param> - public Thread StartTask(IConfiguration taskConfiguration) + public Thread StartTaskOnNewThread(IConfiguration taskConfiguration) { lock (_contextLifeCycle) { @@ -296,19 +297,32 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context var taskInjector = _contextInjector.ForkInjector(taskConfiguration); - var taskRuntime = taskInjector.GetInstance<TaskRuntime>(); - try { + var taskRuntime = taskInjector.GetInstance<TaskRuntime>(); _task = Optional<TaskRuntime>.Of(taskRuntime); - return taskRuntime.RunTask(); + return taskRuntime.StartTaskOnNewThread(); } - catch (Exception e) + catch (InjectionException e) { - var ex = new TaskClientCodeException(string.Empty, Id, "Unable to run the new task", e); + var taskId = string.Empty; + try + { + taskId = taskInjector.GetNamedInstance<TaskConfigurationOptions.Identifier, string>(); + } + catch (Exception) + { + LOGGER.Log(Level.Error, "Unable to get Task ID from TaskConfiguration."); + } + + var ex = TaskClientCodeException.Create(taskId, Id, "Unable to run the new task", e); Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); return null; } + catch (Exception e) + { + throw new SystemException("System error in starting Task.", e); + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 973ea3c..80f0ce5 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -17,13 +17,12 @@ using System; using System.Globalization; -using System.IO; using System.Runtime.Serialization; -using System.Runtime.Serialization.Formatters.Binary; using Org.Apache.REEF.Common.Exceptions; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Time; @@ -254,24 +253,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator try { - using (var memStream = new MemoryStream()) - { - var formatter = new BinaryFormatter(); - formatter.Serialize(memStream, e); - errorBytes = memStream.ToArray(); - } + errorBytes = ByteUtilities.SerializeToBinaryFormat(e); } catch (SerializationException se) { - using (var memStream = new MemoryStream()) - { - var formatter = new BinaryFormatter(); - formatter.Serialize(memStream, new NonSerializableEvaluatorException(e.ToString(), se)); - errorBytes = memStream.ToArray(); - } + errorBytes = ByteUtilities.SerializeToBinaryFormat( + NonSerializableEvaluatorException.UnableToSerialize(e, se)); } - var evaluatorStatusProto = new EvaluatorStatusProto() + var evaluatorStatusProto = new EvaluatorStatusProto { evaluator_id = _evaluatorId, error = errorBytes, http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs index bc40007..47ceb8d 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs @@ -16,15 +16,44 @@ // under the License. using System; -using Org.Apache.REEF.Tang.Interface; +using System.Runtime.Serialization; +using System.Security.Permissions; +using Org.Apache.REEF.Common.Exceptions; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { - internal sealed class TaskClientCodeException : Exception + /// <summary> + /// An Exception thrown when Task operations (Start, Stop, Suspend) fail. + /// </summary> + [Serializable] + public sealed class TaskClientCodeException : Exception { - private readonly string _taskId; + private const string TaskIdStr = "TaskId"; + private const string ContextIdStr = "ContextId"; private readonly string _contextId; + private readonly string _taskId; + + internal static TaskClientCodeException Create( + string taskId, + string contextId, + string message, + Exception cause) + { + return new TaskClientCodeException(taskId, contextId, message, cause); + } + + internal static TaskClientCodeException CreateWithNonSerializableInnerException( + TaskClientCodeException e, SerializationException serializationException) + { + var nonSerializableTaskException = NonSerializableTaskException.UnableToSerialize(e.InnerException, serializationException); + + return new TaskClientCodeException( + e.TaskId, + e.ContextId, + string.Format("Unable to serialize Task control message. TaskClientCodeException message: {0}", e.Message), + nonSerializableTaskException); + } /// <summary> /// construct the exception that caused the Task to fail @@ -33,7 +62,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// <param name="contextId"> the id of the context the failed Task was executing in.</param> /// <param name="message"> the error message </param> /// <param name="cause"> the exception that caused the Task to fail.</param> - public TaskClientCodeException( + private TaskClientCodeException( string taskId, string contextId, string message, @@ -44,7 +73,33 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task _contextId = contextId; } - public string TaskId + /// <summary> + /// construct the exception that caused the Task to fail + /// </summary> + /// <param name="taskId"> the id of the failed task.</param> + /// <param name="contextId"> the id of the context the failed Task was executing in.</param> + /// <param name="message"> the error message </param> + private TaskClientCodeException( + string taskId, + string contextId, + string message) + : base(message) + { + _taskId = taskId; + _contextId = contextId; + } + + /// <summary> + /// Constructor used for serialization. + /// </summary> + private TaskClientCodeException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + _taskId = info.GetString(TaskIdStr); + _contextId = info.GetString(ContextIdStr); + } + + public string TaskId { get { return _taskId; } } @@ -54,10 +109,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task get { return _contextId; } } - public static string GetTaskIdentifier(IConfiguration c) + [SecurityPermission(SecurityAction.Demand, SerializationFormatter = true)] + public override void GetObjectData(SerializationInfo info, StreamingContext context) { - // TODO: update after TANG is available - return string.Empty; + if (info == null) + { + throw new ArgumentNullException("info"); + } + + info.AddValue(TaskIdStr, TaskId); + info.AddValue(ContextIdStr, ContextId); + base.GetObjectData(info, context); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index cfc8c3b..f1247cf 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -81,11 +81,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// <summary> /// Runs the task asynchronously. /// </summary> - public Thread RunTask() + public Thread StartTaskOnNewThread() { if (Interlocked.Exchange(ref _taskRan, 1) != 0) { - // Return if we have already called RunTask + // Return if we have already called StartTaskOnNewThread throw new InvalidOperationException("TaskRun has already been called on TaskRuntime."); } @@ -170,7 +170,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task catch (Exception e) { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger); - _currentStatus.SetException(new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); + _currentStatus.SetException(TaskClientCodeException.Create( + TaskId, ContextId, "Error during Close().", e)); } } @@ -192,7 +193,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", Logger); _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); + TaskClientCodeException.Create(TaskId, ContextId, "Error during Suspend().", e)); } } @@ -211,7 +212,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", Logger); _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); + TaskClientCodeException.Create(TaskId, ContextId, "Error during message delivery.", e)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index ae0141e..42c62ad 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -18,9 +18,7 @@ using System; using System.Collections.Generic; using System.Globalization; -using System.IO; using System.Runtime.Serialization; -using System.Runtime.Serialization.Formatters.Binary; using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Exceptions; @@ -38,8 +36,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); - private readonly BinaryFormatter _binaryFormatter = new BinaryFormatter(); - private readonly TaskLifeCycle _taskLifeCycle; private readonly IHeartBeatManager _heartBeatManager; private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; @@ -256,11 +252,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task byte[] error; try { - error = SerializeException(_lastException.Value); + error = ByteUtilities.SerializeToBinaryFormat(_lastException.Value); } catch (SerializationException se) { - error = SerializeException(new NonSerializableTaskException(_lastException.Value.ToString(), se)); + error = ByteUtilities.SerializeToBinaryFormat( + NonSerializableTaskException.UnableToSerialize(_lastException.Value, se)); } var avroFailedTask = new AvroFailedTask @@ -289,15 +286,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } } - private byte[] SerializeException(Exception ex) - { - using (var memStream = new MemoryStream()) - { - _binaryFormatter.Serialize(memStream, ex); - return memStream.ToArray(); - } - } - private static bool IsLegalStateTransition(TaskState? from, TaskState to) { if (from == null) http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs index 337b793..de1264a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs @@ -17,10 +17,9 @@ using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Runtime.Serialization; -using System.Runtime.Serialization.Formatters.Binary; +using Org.Apache.REEF.Common.Exceptions; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; @@ -45,15 +44,21 @@ namespace Org.Apache.REEF.Driver.Bridge.Events new FailedContext(clr2JavaFailedContext))); var errorBytes = FailedEvaluatorClr2Java.GetErrorBytes(); - if (errorBytes != null) + if (errorBytes != null && errorBytes.Length != 0) { // When the Exception originates from the C# side. - var formatter = new BinaryFormatter(); - using (var memStream = new MemoryStream(errorBytes)) + Exception inner; + try { - var inner = (Exception)formatter.Deserialize(memStream); - _evaluatorException = new EvaluatorException(_id, inner.Message, inner); + inner = (Exception)ByteUtilities.DeserializeFromBinaryFormat(errorBytes); } + catch (SerializationException se) + { + inner = NonSerializableEvaluatorException.UnableToDeserialize( + "Exception from Evaluator was not able to be deserialized, returning a NonSerializableEvaluatorException.", se); + } + + _evaluatorException = new EvaluatorException(_id, inner.Message, inner); } else { http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs index 31d9ca9..8e51123 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs @@ -101,17 +101,14 @@ namespace Org.Apache.REEF.Driver.Bridge.Events try { - using (var memStream = new MemoryStream(serializedCause)) - { - return (Exception)new BinaryFormatter().Deserialize(memStream); - } + return (Exception)ByteUtilities.DeserializeFromBinaryFormat(serializedCause); } catch (SerializationException se) { Exceptions.Caught(se, Level.Info, - "Exception from Task was not able to be deserialized, returning a NonSerializableException.", Logger); + "Exception from Task was not able to be deserialized, returning a NonSerializableTaskException.", Logger); - return new NonSerializableTaskException(taskExceptionString, se); + return NonSerializableTaskException.UnableToDeserialize(taskExceptionString, se); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs index 8fd43b4..02a9724 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs @@ -180,7 +180,7 @@ namespace Org.Apache.REEF.Evaluator.Tests { var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - taskThread = contextRuntime.StartTask(taskConfig); + taskThread = contextRuntime.StartTaskOnNewThread(taskConfig); Assert.True(contextRuntime.TaskRuntime.IsPresent()); Assert.True(contextRuntime.GetTaskStatus().IsPresent()); @@ -231,13 +231,13 @@ namespace Org.Apache.REEF.Evaluator.Tests { var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - taskThread = contextRuntime.StartTask(taskConfig); + taskThread = contextRuntime.StartTaskOnNewThread(taskConfig); Assert.True(contextRuntime.TaskRuntime.IsPresent()); Assert.True(contextRuntime.GetTaskStatus().IsPresent()); Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); - Assert.Throws<InvalidOperationException>(() => contextRuntime.StartTask(taskConfig)); + Assert.Throws<InvalidOperationException>(() => contextRuntime.StartTaskOnNewThread(taskConfig)); } finally { @@ -275,7 +275,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - var taskThread = contextRuntime.StartTask(taskConfig); + var taskThread = contextRuntime.StartTaskOnNewThread(taskConfig); var testTask = contextRuntime.TaskRuntime.Value.Task as TestTask; if (testTask == null) { @@ -288,7 +288,7 @@ namespace Org.Apache.REEF.Evaluator.Tests taskThread.Join(); - taskThread = contextRuntime.StartTask(taskConfig); + taskThread = contextRuntime.StartTaskOnNewThread(taskConfig); Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); var secondTestTask = contextRuntime.TaskRuntime.Value.Task as TestTask; http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs index e8ece72..476e24b 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs @@ -235,7 +235,7 @@ namespace Org.Apache.REEF.Evaluator.Tests serviceInjector = rootContext.ServiceInjector; for (var i = 0; i < tasksRun; i++) { - rootContext.StartTask(launcher.RootTaskConfig.Value).Join(); + rootContext.StartTaskOnNewThread(launcher.RootTaskConfig.Value).Join(); } Assert.NotNull(serviceInjector); http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs index 3f452c4..90a5d18 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -76,7 +76,7 @@ namespace Org.Apache.REEF.Evaluator.Tests { var injector = GetInjector(); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var task = injector.GetInstance<TestTask>(); task.FinishCountdownEvent.Wait(); task.DisposeCountdownEvent.Wait(); @@ -93,7 +93,7 @@ namespace Org.Apache.REEF.Evaluator.Tests { var injector = GetInjector(typeof(ExceptionAction)); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var task = injector.GetInstance<TestTask>(); task.DisposeCountdownEvent.Wait(); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed); @@ -120,7 +120,7 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.Equal(statusProto.state, State.INIT); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Init); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); Assert.Equal(taskRuntime.GetStatusProto().state, State.RUNNING); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Running); @@ -168,7 +168,7 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new Exception("Event handler is not expected to be null."); } - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); Assert.True(testTaskEventStartHandler.StartInvoked.IsPresent()); Assert.Equal(testTaskEventStartHandler.StartInvoked.Value, taskId); @@ -212,7 +212,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(ExceptionAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var task = injector.GetInstance<TestTask>(); task.FinishCountdownEvent.Wait(); @@ -243,7 +243,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(CountDownAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var taskInterface = injector.GetInstance<ITask>(); Assert.True(taskInterface is TestTask); @@ -275,7 +275,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var taskInterface = injector.GetInstance<ITask>(); Assert.True(taskInterface is TestTask); @@ -316,7 +316,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(ExceptionAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - var taskThread = taskRuntime.RunTask(); + var taskThread = taskRuntime.StartTaskOnNewThread(); var task = injector.GetInstance<TestTask>(); http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskConstructorExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskConstructorExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskConstructorExceptionTest.cs new file mode 100644 index 0000000..0f367b9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskConstructorExceptionTest.cs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This test class contains a test that validates that an Exception in an <see cref="ITask"/> + /// constructor triggers an <see cref="IFailedTask"/> event. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TaskConstructorExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskConstructorExceptionTest)); + + private const string TaskId = "TaskID"; + private const string ContextId = "ContextID"; + + /// <summary> + /// This test validates that an Exception in an <see cref="ITask"/> constructor triggers + /// a <see cref="IFailedTask"/> event. + /// </summary> + [Fact] + public void TestTaskConstructorException() + { + var testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + + TestRun( + DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TestTaskConstructorExceptionDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TestTaskConstructorExceptionDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<TestTaskConstructorExceptionDriver>.Class) + .Build(), + typeof(TestTaskConstructorExceptionDriver), 1, "TestTaskConstructorException", "local", testFolder); + + ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: 1, testFolder: testFolder); + ValidateMessageSuccessfullyLoggedForDriver(TestTaskConstructorExceptionDriver.ReceivedFailedTaskEvent, testFolder); + CleanUp(testFolder); + } + + private sealed class TestTaskConstructorExceptionDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IFailedTask> + { + public const string ReceivedFailedTaskEvent = "ReceivedFailedTaskEvent"; + + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TestTaskConstructorExceptionDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + var contextConf = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Build(); + + var taskConf = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<TestConstructorExceptionTask>.Class) + .Build(); + + value.SubmitContextAndTask(contextConf, taskConf); + } + + public void OnNext(IFailedTask value) + { + var taskClientCodeEx = value.AsError() as TaskClientCodeException; + if (taskClientCodeEx == null) + { + throw new Exception("Expected Exception to be a TaskClientCodeException."); + } + + if (taskClientCodeEx.ContextId != ContextId) + { + throw new Exception("Expected Context ID to be " + ContextId + ", but instead got " + taskClientCodeEx.ContextId); + } + + if (taskClientCodeEx.TaskId != TaskId) + { + throw new Exception("Expected Task ID to be " + TaskId + ", but instead got " + taskClientCodeEx.TaskId); + } + + Exception error = taskClientCodeEx; + + var foundErrorMessage = false; + while (error != null) + { + // Using Contains because the Exception may not be serializable + // and the message of the wrapping Exception may be expanded to include more details. + if (error.Message.Contains(TestConstructorExceptionTask.ConstructorErrorMessage)) + { + foundErrorMessage = true; + break; + } + + error = error.InnerException; + } + + if (!foundErrorMessage) + { + throw new Exception("Expected to find error message " + + TestConstructorExceptionTask.ConstructorErrorMessage + " in the layer of Exceptions."); + } + + Logger.Log(Level.Info, ReceivedFailedTaskEvent); + value.GetActiveContext().Value.Dispose(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TestConstructorExceptionTask : ITask + { + public const string ConstructorErrorMessage = "ConstructorErrorMessage"; + + [Inject] + private TestConstructorExceptionTask() + { + throw new Exception(ConstructorErrorMessage); + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 932d3f2..ea695d4 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -96,6 +96,7 @@ under the License. <Compile Include="Functional\Failure\TestEvaluatorWithActiveContextImmediatePoison.cs" /> <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" /> <Compile Include="Functional\Failure\SleepTask.cs" /> + <Compile Include="Functional\Failure\User\TaskConstructorExceptionTest.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1a2f120c/lang/cs/Org.Apache.REEF.Utilities/ByteUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/ByteUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/ByteUtilities.cs index b69930b..4c79c4e 100644 --- a/lang/cs/Org.Apache.REEF.Utilities/ByteUtilities.cs +++ b/lang/cs/Org.Apache.REEF.Utilities/ByteUtilities.cs @@ -16,22 +16,34 @@ // under the License. using System; +using System.IO; +using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; using System.Text; namespace Org.Apache.REEF.Utilities { - public class ByteUtilities + public static class ByteUtilities { + /// <summary> + /// Converts a string to a UTF-8 encoded byte array. + /// </summary> public static byte[] StringToByteArrays(string s) { return Encoding.UTF8.GetBytes(s); } + /// <summary> + /// Converts from a UTF-8 encoded byte array to a string. + /// </summary> public static string ByteArraysToString(byte[] b) { return Encoding.UTF8.GetString(b); } + /// <summary> + /// Performs a deep copy of a byte array. + /// </summary> public static byte[] CopyBytesFrom(byte[] from) { int length = Buffer.ByteLength(from); @@ -39,5 +51,31 @@ namespace Org.Apache.REEF.Utilities Buffer.BlockCopy(from, 0, to, 0, length); return to; } + + /// <summary> + /// Serializes object to a byte array with a <see cref="BinaryFormatter"/>. + /// </summary> + /// <exception cref="SerializationException">When serialization fails.</exception> + public static byte[] SerializeToBinaryFormat(object obj) + { + using (var memStream = new MemoryStream()) + { + new BinaryFormatter().Serialize(memStream, obj); + return memStream.ToArray(); + } + } + + /// <summary> + /// Deserializes object from a byte array with a <see cref="BinaryFormatter"/>. + /// </summary> + /// <exception cref="SerializationException">When deserialization fails.</exception> + public static object DeserializeFromBinaryFormat(byte[] bytes) + { + var formatter = new BinaryFormatter(); + using (var memStream = new MemoryStream(bytes)) + { + return formatter.Deserialize(memStream); + } + } } }
