Repository: reef Updated Branches: refs/heads/master 5e7d99be3 -> 2ce19a571
[REEF-1778] Ensure ITask.Dispose() is executed This moves the reporting of the result of a task *after* the call to `Dispose` on that task to ensure execution of that method. To do so, I made the calls to the task start and stop handlers explicit, and kept track of the exceptions thrown and need to send a result in the `TaskRuntime`. Also, this fixes a bug in `TaskRuntimeTests` which did not wait for the spawned thread to finish before validation of the test results. JIRA: [REEF-1778](https://issues.apache.org/jira/browse/REEF-1778) Pull Request: This closes #1293 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2ce19a57 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2ce19a57 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2ce19a57 Branch: refs/heads/master Commit: 2ce19a571df020a9f2114d33bb9be92df5cc7ca1 Parents: 5e7d99b Author: Markus Weimer <[email protected]> Authored: Mon Apr 17 18:50:03 2017 -0700 Committer: Julia Wang <[email protected]> Committed: Wed May 3 15:14:24 2017 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/Task/TaskRuntime.cs | 51 +++++++++++++++----- .../Runtime/Evaluator/Task/TaskStatus.cs | 31 ++++++++++-- .../TaskRuntimeTests.cs | 2 +- 3 files changed, 67 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 39c51f6..863b30d 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -93,54 +93,81 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task var taskThread = new Thread(() => { + // The result of the task execution. + byte[] resultReturnedByTask = null; + + // Whether or not a result shall be returned to the Driver. + bool returnResultToDriver = true; + + // Exception thrown during `Dispose`, if any. + Exception exceptionThrownByTaskDispose = null; + try { + // Run the handlers for `TaskStart` Logger.Log(Level.Verbose, "Set running status for task"); + _currentStatus.RunTaskStartHandlers(); + + // Update the state _currentStatus.SetRunning(); + + // Call the Task Logger.Log(Level.Verbose, "Calling into user's task."); - var result = _userTask.Call(null); + resultReturnedByTask = _userTask.Call(null); Logger.Log(Level.Info, "Task Call Finished"); - _currentStatus.SetResult(result); - const Level resultLogLevel = Level.Verbose; + // Run the handlers for `TaskStop` + _currentStatus.RunTaskStopHandlers(); - if (Logger.IsLoggable(resultLogLevel) && result != null && result.Length > 0) + // Log the result + const Level resultLogLevel = Level.Verbose; + if (Logger.IsLoggable(resultLogLevel) && resultReturnedByTask != null && resultReturnedByTask.Length > 0) { Logger.Log(resultLogLevel, - "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + "Task running result:\r\n" + System.Text.Encoding.Default.GetString(resultReturnedByTask)); } } catch (TaskStartHandlerException e) { Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException"); _currentStatus.SetException(e.InnerException); + returnResultToDriver = false; } catch (TaskStopHandlerException e) { Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException"); _currentStatus.SetException(e.InnerException); + returnResultToDriver = false; } catch (Exception e) { Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType()); _currentStatus.SetException(e); + returnResultToDriver = false; } finally { try { - if (_userTask != null) - { - _userTask.Dispose(); - } + _userTask.Dispose(); } catch (Exception e) { - var msg = "Exception during Task Dispose in task Call()"; - Logger.Log(Level.Error, msg); - throw new InvalidOperationException(msg, e); + exceptionThrownByTaskDispose = new InvalidOperationException("Exception during Task Dispose in task Call()", e); } } + + // Inform the driver about the result. + if (returnResultToDriver) + { + _currentStatus.SetResult(resultReturnedByTask); + } + + // If the ITask.Dispose() method threw an Exception, crash the Evaluator. + if (exceptionThrownByTaskDispose != null) + { + throw exceptionThrownByTaskDispose; + } }); taskThread.Start(); http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index a195333..f906ec9 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -110,9 +110,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { _result = Optional<byte[]>.OfNullable(result); - // This can throw an Exception. - _taskLifeCycle.Stop(); - switch (State) { case TaskState.SuspendRequested: @@ -141,6 +138,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } } + /// <summary> + /// Runs the Task Start Handlers + /// </summary> + /// <exception cref="TaskStartHandlerException">If any of the Task Start Handlers throws an exception</exception> + public void RunTaskStartHandlers() + { + _taskLifeCycle.Start(); + } + + /// <summary> + /// Runs the Task Stop Handlers + /// </summary> + /// <exception cref="TaskStopHandlerException">If any of the Task Stop Handlers throws an exception</exception> + public void RunTaskStopHandlers() + { + _taskLifeCycle.Stop(); + } + public void SetRunning() { lock (_heartBeatManager) @@ -148,7 +163,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); if (_state == TaskState.Init) { - _taskLifeCycle.Start(); State = TaskState.Running; LOGGER.Log(Level.Verbose, "Sending task Running heartbeat"); Heartbeat(); @@ -202,6 +216,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task return _state != TaskState.Running; } + /// <summary> + /// Check whether the task is in state `Running` + /// </summary> + /// <returns>true, the task is in state `Running`</returns> + public bool IsRunning() + { + return _state == TaskState.Running; + } + public bool HasEnded() { switch (_state) http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs index c8078b0..489dcab 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -79,9 +79,9 @@ namespace Org.Apache.REEF.Evaluator.Tests var task = injector.GetInstance<TestTask>(); task.FinishCountdownEvent.Wait(); task.DisposeCountdownEvent.Wait(); + taskThread.Join(); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done); Assert.True(taskRuntime.HasEnded()); - taskThread.Join(); } /// <summary>
