Repository: reef Updated Branches: refs/heads/master 23f5770da -> a304daac9
[REEF-1377] Move from C# Tasks to Threads to run user's Task This * moves from Tasks to Threads, * switches the behavior of tests to reflect the move and * marks HeartbeatManger and TaskStatus as ThreadSafe. JIRA: [REEF-1377](https://issues.apache.org/jira/browse/REEF-1377) Pull Request: This closes #984 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a304daac Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a304daac Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a304daac Branch: refs/heads/master Commit: a304daac92e17abb65ce1ccd25522928a402d6ea Parents: 23f5770 Author: Andrew Chung <[email protected]> Authored: Mon May 2 11:40:44 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon May 2 14:34:01 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/Context/ContextRuntime.cs | 3 +- .../Runtime/Evaluator/HeartBeatManager.cs | 2 + .../Runtime/Evaluator/Task/TaskRuntime.cs | 87 +++++++------------- .../Runtime/Evaluator/Task/TaskStatus.cs | 2 + .../ContextRuntimeTests.cs | 32 +++++-- .../EvaluatorServiceTests.cs | 12 +-- .../TaskRuntimeTests.cs | 31 +++++-- 7 files changed, 86 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index cd4b2d3..ffb5b3c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -19,6 +19,7 @@ using System; using System.Collections.Generic; using System.Globalization; using System.Linq; +using System.Threading; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; @@ -266,7 +267,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// Launches an Task on this context. /// </summary> /// <param name="taskConfiguration"></param> - public System.Threading.Tasks.Task StartTask(IConfiguration taskConfiguration) + public Thread StartTask(IConfiguration taskConfiguration) { lock (_contextLifeCycle) { http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index becdc7a..db422b1 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -29,6 +29,7 @@ using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; @@ -37,6 +38,7 @@ using Org.Apache.REEF.Wake.Time.Event; namespace Org.Apache.REEF.Common.Runtime.Evaluator { + [ThreadSafe] internal sealed class HeartBeatManager : IHeartBeatManager { private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 970a36c..654706a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -82,7 +82,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// <summary> /// Runs the task asynchronously. /// </summary> - public System.Threading.Tasks.Task RunTask() + public Thread RunTask() { if (Interlocked.Exchange(ref _taskRan, 1) != 0) { @@ -93,77 +93,46 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // Send heartbeat such that user receives a TaskRunning message. _currentStatus.SetRunning(); - return System.Threading.Tasks.Task.Run(() => + var taskThread = new Thread(() => { - Logger.Log(Level.Info, "Calling into user's task."); - return _userTask.Call(null); - }).ContinueWith((System.Threading.Tasks.Task<byte[]> runTask) => + try { - try - { - // Task failed. - if (runTask.IsFaulted) - { - OnTaskFailure(runTask); - return; - } - - if (runTask.IsCanceled) - { - Logger.Log(Level.Error, - string.Format(CultureInfo.InvariantCulture, "Task failed caused by System.Threading.Task cancellation")); - OnTaskFailure(runTask); - return; - } + Logger.Log(Level.Verbose, "Calling into user's task."); + var result = _userTask.Call(null); + Logger.Log(Level.Info, "Task Call Finished"); + _currentStatus.SetResult(result); - // Task completed. - var result = runTask.Result; - Logger.Log(Level.Info, "Task Call Finished"); - _currentStatus.SetResult(result); + const Level resultLogLevel = Level.Verbose; - const Level resultLogLevel = Level.Verbose; - - if (Logger.CustomLevel >= resultLogLevel && result != null && result.Length > 0) - { - Logger.Log(resultLogLevel, - "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); - } - } - catch (Exception) + if (Logger.CustomLevel >= resultLogLevel && result != null && result.Length > 0) { - // TODO[JIRA REEF-1364]: Properly handle Exceptions and send a message to the Driver. - Logger.Log(Level.Error, "Received uncaught System Exception, force shutting down the Evaluator."); - - Environment.Exit(1); + Logger.Log(resultLogLevel, + "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); } - finally + } + catch (Exception e) + { + _currentStatus.SetException(e); + } + finally + { + try { if (_userTask != null) { _userTask.Dispose(); } - - runTask.Dispose(); } - }); - } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.Caught( + e, Level.Error, "Exception in disposing Task but ignoring as Task has already completed.", Logger); + } + } + }); - /// <summary> - /// Sets the current status of the Task with the Exception it failed with. - /// </summary> - private void OnTaskFailure(System.Threading.Tasks.Task runTask) - { - if (runTask.Exception == null) - { - _currentStatus.SetException(new SystemException("Task failed without an Exception.")); - } - else - { - var aggregateException = runTask.Exception.Flatten(); - _currentStatus.SetException( - aggregateException.InnerExceptions.Count == 1 ? - aggregateException.InnerExceptions.First() : aggregateException); - } + taskThread.Start(); + return taskThread; } public TaskState GetTaskState() http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index f59184c..3e8a128 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -28,10 +28,12 @@ using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { + [ThreadSafe] internal sealed class TaskStatus { private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs index 484f5a4..8fd43b4 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs @@ -175,11 +175,12 @@ namespace Org.Apache.REEF.Evaluator.Tests .Set(TaskConfiguration.Identifier, "ID") .Build(); + Thread taskThread = null; try { var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - contextRuntime.StartTask(taskConfig); + taskThread = contextRuntime.StartTask(taskConfig); Assert.True(contextRuntime.TaskRuntime.IsPresent()); Assert.True(contextRuntime.GetTaskStatus().IsPresent()); @@ -200,6 +201,11 @@ namespace Org.Apache.REEF.Evaluator.Tests testTask.CountDownEvent.Signal(); testTask.DisposedEvent.Wait(); + + if (taskThread != null) + { + taskThread.Join(); + } } } } @@ -207,7 +213,7 @@ namespace Org.Apache.REEF.Evaluator.Tests [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public async Task TestUnableToRunMultipleTasksAtTheSameTime() + public void TestUnableToRunMultipleTasksAtTheSameTime() { var serviceInjector = TangFactory.GetTang().NewInjector(); var contextConfig = GetSimpleContextConfiguration(); @@ -219,18 +225,19 @@ namespace Org.Apache.REEF.Evaluator.Tests .Set(TaskConfiguration.Identifier, "ID") .Build(); + Thread taskThread = null; + try { var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - var t = contextRuntime.StartTask(taskConfig); + taskThread = contextRuntime.StartTask(taskConfig); Assert.True(contextRuntime.TaskRuntime.IsPresent()); Assert.True(contextRuntime.GetTaskStatus().IsPresent()); Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); - await Assert.ThrowsAsync<InvalidOperationException>( - () => contextRuntime.StartTask(taskConfig)); + Assert.Throws<InvalidOperationException>(() => contextRuntime.StartTask(taskConfig)); } finally { @@ -241,7 +248,10 @@ namespace Org.Apache.REEF.Evaluator.Tests } testTask.CountDownEvent.Signal(); - testTask.DisposedEvent.Wait(); + if (taskThread != null) + { + taskThread.Join(); + } } } } @@ -264,7 +274,8 @@ namespace Org.Apache.REEF.Evaluator.Tests var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - contextRuntime.StartTask(taskConfig); + + var taskThread = contextRuntime.StartTask(taskConfig); var testTask = contextRuntime.TaskRuntime.Value.Task as TestTask; if (testTask == null) { @@ -274,9 +285,10 @@ namespace Org.Apache.REEF.Evaluator.Tests testTask.CountDownEvent.Signal(); testTask.StopEvent.Wait(); Assert.False(contextRuntime.GetTaskStatus().IsPresent()); - testTask.DisposedEvent.Wait(); - contextRuntime.StartTask(taskConfig); + taskThread.Join(); + + taskThread = contextRuntime.StartTask(taskConfig); Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); var secondTestTask = contextRuntime.TaskRuntime.Value.Task as TestTask; @@ -291,6 +303,8 @@ namespace Org.Apache.REEF.Evaluator.Tests secondTestTask.StopEvent.Wait(); Assert.False(contextRuntime.GetTaskStatus().IsPresent()); secondTestTask.DisposedEvent.Wait(); + + taskThread.Join(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs index e38b3a8..e8ece72 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs @@ -210,20 +210,20 @@ namespace Org.Apache.REEF.Evaluator.Tests [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public async Task TestServiceTaskEventHandlersTriggered() + public void TestServiceTaskEventHandlersTriggered() { - await RunTasksAndVerifyEventHandlers(1); + RunTasksAndVerifyEventHandlers(1); } [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public async Task TestServiceTaskEventHandlersTriggeredSuccessiveTasks() + public void TestServiceTaskEventHandlersTriggeredSuccessiveTasks() { - await RunTasksAndVerifyEventHandlers(5); + RunTasksAndVerifyEventHandlers(5); } - private static async Task RunTasksAndVerifyEventHandlers(int tasksRun) + private static void RunTasksAndVerifyEventHandlers(int tasksRun) { var launcher = GetRootContextLauncher( GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Of(GetTaskConfiguration())); @@ -235,7 +235,7 @@ namespace Org.Apache.REEF.Evaluator.Tests serviceInjector = rootContext.ServiceInjector; for (var i = 0; i < tasksRun; i++) { - await rootContext.StartTask(launcher.RootTaskConfig.Value); + rootContext.StartTask(launcher.RootTaskConfig.Value).Join(); } Assert.NotNull(serviceInjector); http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs index 56add09..3f452c4 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -76,12 +76,13 @@ namespace Org.Apache.REEF.Evaluator.Tests { var injector = GetInjector(); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var task = injector.GetInstance<TestTask>(); task.FinishCountdownEvent.Wait(); task.DisposeCountdownEvent.Wait(); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done); Assert.True(taskRuntime.HasEnded()); + taskThread.Join(); } /// <summary> @@ -92,11 +93,12 @@ namespace Org.Apache.REEF.Evaluator.Tests { var injector = GetInjector(typeof(ExceptionAction)); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var task = injector.GetInstance<TestTask>(); task.DisposeCountdownEvent.Wait(); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed); Assert.True(taskRuntime.HasEnded()); + taskThread.Join(); } /// <summary> @@ -118,7 +120,8 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.Equal(statusProto.state, State.INIT); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Init); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); + Assert.Equal(taskRuntime.GetStatusProto().state, State.RUNNING); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Running); @@ -138,6 +141,8 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.Equal(taskRuntime.GetStatusProto().state, State.DONE); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done); + + taskThread.Join(); } /// <summary> @@ -163,7 +168,7 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new Exception("Event handler is not expected to be null."); } - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); Assert.True(testTaskEventStartHandler.StartInvoked.IsPresent()); Assert.Equal(testTaskEventStartHandler.StartInvoked.Value, taskId); @@ -190,6 +195,8 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.True(ReferenceEquals(testTaskEventStartHandler, testTaskEventStopHandler)); Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent()); Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId); + + taskThread.Join(); } /// <summary> @@ -205,7 +212,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(ExceptionAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var task = injector.GetInstance<TestTask>(); task.FinishCountdownEvent.Wait(); @@ -221,6 +228,8 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent()); Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId); + + taskThread.Join(); } /// <summary> @@ -234,7 +243,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(CountDownAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var taskInterface = injector.GetInstance<ITask>(); Assert.True(taskInterface is TestTask); @@ -251,6 +260,8 @@ namespace Org.Apache.REEF.Evaluator.Tests task.DisposeCountdownEvent.Wait(); Assert.True(task.SuspendInvoked); + + taskThread.Join(); } /// <summary> @@ -264,7 +275,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var taskInterface = injector.GetInstance<ITask>(); Assert.True(taskInterface is TestTask); @@ -290,6 +301,8 @@ namespace Org.Apache.REEF.Evaluator.Tests taskRuntime.Suspend(null); Assert.False(task.SuspendInvoked); + + taskThread.Join(); } /// <summary> @@ -303,7 +316,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var injector = GetInjector(typeof(ExceptionAction), contextId, taskId); var taskRuntime = injector.GetInstance<TaskRuntime>(); - taskRuntime.RunTask(); + var taskThread = taskRuntime.RunTask(); var task = injector.GetInstance<TestTask>(); @@ -322,6 +335,8 @@ namespace Org.Apache.REEF.Evaluator.Tests taskRuntime.Suspend(null); Assert.False(task.SuspendInvoked); + + taskThread.Join(); } private static IInjector GetInjector(string contextId = "contextId", string taskId = "taskId")
