Repository: reef
Updated Branches:
  refs/heads/master 23f5770da -> a304daac9


[REEF-1377] Move from C# Tasks to Threads to run user's Task

This
  * moves from Tasks to Threads,
  * switches the behavior of tests to reflect the move and
  * marks HeartbeatManger and TaskStatus as ThreadSafe.

JIRA:
  [REEF-1377](https://issues.apache.org/jira/browse/REEF-1377)

Pull Request:
  This closes #984


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a304daac
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a304daac
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a304daac

Branch: refs/heads/master
Commit: a304daac92e17abb65ce1ccd25522928a402d6ea
Parents: 23f5770
Author: Andrew Chung <[email protected]>
Authored: Mon May 2 11:40:44 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon May 2 14:34:01 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/Context/ContextRuntime.cs |  3 +-
 .../Runtime/Evaluator/HeartBeatManager.cs       |  2 +
 .../Runtime/Evaluator/Task/TaskRuntime.cs       | 87 +++++++-------------
 .../Runtime/Evaluator/Task/TaskStatus.cs        |  2 +
 .../ContextRuntimeTests.cs                      | 32 +++++--
 .../EvaluatorServiceTests.cs                    | 12 +--
 .../TaskRuntimeTests.cs                         | 31 +++++--
 7 files changed, 86 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
index cd4b2d3..ffb5b3c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
@@ -19,6 +19,7 @@ using System;
 using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Events;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
@@ -266,7 +267,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// Launches an Task on this context.
         /// </summary>
         /// <param name="taskConfiguration"></param>
-        public System.Threading.Tasks.Task StartTask(IConfiguration 
taskConfiguration)
+        public Thread StartTask(IConfiguration taskConfiguration)
         {
             lock (_contextLifeCycle)
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index becdc7a..db422b1 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -29,6 +29,7 @@ using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
 using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
@@ -37,6 +38,7 @@ using Org.Apache.REEF.Wake.Time.Event;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator
 {
+    [ThreadSafe]
     internal sealed class HeartBeatManager : IHeartBeatManager
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(HeartBeatManager));

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 970a36c..654706a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -82,7 +82,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         /// <summary>
         /// Runs the task asynchronously.
         /// </summary>
-        public System.Threading.Tasks.Task RunTask()
+        public Thread RunTask()
         {
             if (Interlocked.Exchange(ref _taskRan, 1) != 0)
             {
@@ -93,77 +93,46 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             // Send heartbeat such that user receives a TaskRunning message.
             _currentStatus.SetRunning();
 
-            return System.Threading.Tasks.Task.Run(() =>
+            var taskThread = new Thread(() =>
             {
-                Logger.Log(Level.Info, "Calling into user's task.");
-                return _userTask.Call(null);
-            }).ContinueWith((System.Threading.Tasks.Task<byte[]> runTask) =>
+                try
                 {
-                    try
-                    {
-                        // Task failed.
-                        if (runTask.IsFaulted)
-                        {
-                            OnTaskFailure(runTask);
-                            return;
-                        }
-
-                        if (runTask.IsCanceled)
-                        {
-                            Logger.Log(Level.Error,
-                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by System.Threading.Task cancellation"));
-                            OnTaskFailure(runTask);
-                            return;
-                        }
+                    Logger.Log(Level.Verbose, "Calling into user's task.");
+                    var result = _userTask.Call(null);
+                    Logger.Log(Level.Info, "Task Call Finished");
+                    _currentStatus.SetResult(result);
 
-                        // Task completed.
-                        var result = runTask.Result;
-                        Logger.Log(Level.Info, "Task Call Finished");
-                        _currentStatus.SetResult(result);
+                    const Level resultLogLevel = Level.Verbose;
 
-                        const Level resultLogLevel = Level.Verbose;
-
-                        if (Logger.CustomLevel >= resultLogLevel && result != 
null && result.Length > 0)
-                        {
-                            Logger.Log(resultLogLevel,
-                                "Task running result:\r\n" + 
System.Text.Encoding.Default.GetString(result));
-                        }
-                    }
-                    catch (Exception)
+                    if (Logger.CustomLevel >= resultLogLevel && result != null 
&& result.Length > 0)
                     {
-                        // TODO[JIRA REEF-1364]: Properly handle Exceptions 
and send a message to the Driver.
-                        Logger.Log(Level.Error, "Received uncaught System 
Exception, force shutting down the Evaluator.");
-
-                        Environment.Exit(1);
+                        Logger.Log(resultLogLevel,
+                            "Task running result:\r\n" + 
System.Text.Encoding.Default.GetString(result));
                     }
-                    finally
+                }
+                catch (Exception e)
+                {
+                    _currentStatus.SetException(e);
+                }
+                finally
+                {
+                    try
                     {
                         if (_userTask != null)
                         {
                             _userTask.Dispose();
                         }
-
-                        runTask.Dispose();
                     }
-                });
-        }
+                    catch (Exception e)
+                    {
+                        Utilities.Diagnostics.Exceptions.Caught(
+                            e, Level.Error, "Exception in disposing Task but 
ignoring as Task has already completed.", Logger);
+                    }
+                }
+            });
 
-        /// <summary>
-        /// Sets the current status of the Task with the Exception it failed 
with.
-        /// </summary>
-        private void OnTaskFailure(System.Threading.Tasks.Task runTask)
-        {
-            if (runTask.Exception == null)
-            {
-                _currentStatus.SetException(new SystemException("Task failed 
without an Exception."));
-            }
-            else
-            {
-                var aggregateException = runTask.Exception.Flatten();
-                _currentStatus.SetException(
-                    aggregateException.InnerExceptions.Count == 1 ?
-                    aggregateException.InnerExceptions.First() : 
aggregateException);
-            }
+            taskThread.Start();
+            return taskThread;
         }
 
         public TaskState GetTaskState()

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index f59184c..3e8a128 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -28,10 +28,12 @@ using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 {
+    [ThreadSafe]
     internal sealed class TaskStatus
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(TaskStatus));

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
index 484f5a4..8fd43b4 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
@@ -175,11 +175,12 @@ namespace Org.Apache.REEF.Evaluator.Tests
                     .Set(TaskConfiguration.Identifier, "ID")
                     .Build();
 
+                Thread taskThread = null;
                 try
                 {
                     var hbMgr = Substitute.For<IHeartBeatManager>();
                     
contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class,
 hbMgr);
-                    contextRuntime.StartTask(taskConfig);
+                    taskThread = contextRuntime.StartTask(taskConfig);
 
                     Assert.True(contextRuntime.TaskRuntime.IsPresent());
                     Assert.True(contextRuntime.GetTaskStatus().IsPresent());
@@ -200,6 +201,11 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
                     testTask.CountDownEvent.Signal();
                     testTask.DisposedEvent.Wait();
+
+                    if (taskThread != null)
+                    {
+                        taskThread.Join();
+                    }
                 }
             }
         }
@@ -207,7 +213,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
         [Fact]
         [Trait("Priority", "0")]
         [Trait("Category", "Unit")]
-        public async Task TestUnableToRunMultipleTasksAtTheSameTime()
+        public void TestUnableToRunMultipleTasksAtTheSameTime()
         {
             var serviceInjector = TangFactory.GetTang().NewInjector();
             var contextConfig = GetSimpleContextConfiguration();
@@ -219,18 +225,19 @@ namespace Org.Apache.REEF.Evaluator.Tests
                     .Set(TaskConfiguration.Identifier, "ID")
                     .Build();
 
+                Thread taskThread = null;
+
                 try
                 {
                     var hbMgr = Substitute.For<IHeartBeatManager>();
                     
contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class,
 hbMgr);
-                    var t = contextRuntime.StartTask(taskConfig);
+                    taskThread = contextRuntime.StartTask(taskConfig);
 
                     Assert.True(contextRuntime.TaskRuntime.IsPresent());
                     Assert.True(contextRuntime.GetTaskStatus().IsPresent());
                     Assert.Equal(contextRuntime.GetTaskStatus().Value.state, 
State.RUNNING);
 
-                    await Assert.ThrowsAsync<InvalidOperationException>(
-                        () => contextRuntime.StartTask(taskConfig));
+                    Assert.Throws<InvalidOperationException>(() => 
contextRuntime.StartTask(taskConfig));
                 }
                 finally
                 {
@@ -241,7 +248,10 @@ namespace Org.Apache.REEF.Evaluator.Tests
                     }
 
                     testTask.CountDownEvent.Signal();
-                    testTask.DisposedEvent.Wait();
+                    if (taskThread != null)
+                    {
+                        taskThread.Join();
+                    }
                 }
             }
         }
@@ -264,7 +274,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
                 var hbMgr = Substitute.For<IHeartBeatManager>();
                 
contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class,
 hbMgr);
-                contextRuntime.StartTask(taskConfig);
+
+                var taskThread = contextRuntime.StartTask(taskConfig);
                 var testTask = contextRuntime.TaskRuntime.Value.Task as 
TestTask;
                 if (testTask == null)
                 {
@@ -274,9 +285,10 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 testTask.CountDownEvent.Signal();
                 testTask.StopEvent.Wait();
                 Assert.False(contextRuntime.GetTaskStatus().IsPresent());
-                testTask.DisposedEvent.Wait();
 
-                contextRuntime.StartTask(taskConfig);
+                taskThread.Join();
+
+                taskThread = contextRuntime.StartTask(taskConfig);
                 Assert.Equal(contextRuntime.GetTaskStatus().Value.state, 
State.RUNNING);
 
                 var secondTestTask = contextRuntime.TaskRuntime.Value.Task as 
TestTask;
@@ -291,6 +303,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 secondTestTask.StopEvent.Wait();
                 Assert.False(contextRuntime.GetTaskStatus().IsPresent());
                 secondTestTask.DisposedEvent.Wait();
+
+                taskThread.Join();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs
index e38b3a8..e8ece72 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs
@@ -210,20 +210,20 @@ namespace Org.Apache.REEF.Evaluator.Tests
         [Fact]
         [Trait("Priority", "0")]
         [Trait("Category", "Unit")]
-        public async Task TestServiceTaskEventHandlersTriggered()
+        public void TestServiceTaskEventHandlersTriggered()
         {
-            await RunTasksAndVerifyEventHandlers(1);
+            RunTasksAndVerifyEventHandlers(1);
         }
 
         [Fact]
         [Trait("Priority", "0")]
         [Trait("Category", "Unit")]
-        public async Task 
TestServiceTaskEventHandlersTriggeredSuccessiveTasks()
+        public void TestServiceTaskEventHandlersTriggeredSuccessiveTasks()
         {
-            await RunTasksAndVerifyEventHandlers(5);
+            RunTasksAndVerifyEventHandlers(5);
         }
 
-        private static async Task RunTasksAndVerifyEventHandlers(int tasksRun)
+        private static void RunTasksAndVerifyEventHandlers(int tasksRun)
         {
             var launcher = GetRootContextLauncher(
                GetContextConfiguration(), GetServiceConfiguration(), 
Optional<IConfiguration>.Of(GetTaskConfiguration()));
@@ -235,7 +235,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 serviceInjector = rootContext.ServiceInjector;
                 for (var i = 0; i < tasksRun; i++)
                 {
-                    await rootContext.StartTask(launcher.RootTaskConfig.Value);
+                    
rootContext.StartTask(launcher.RootTaskConfig.Value).Join();
                 }
 
                 Assert.NotNull(serviceInjector);

http://git-wip-us.apache.org/repos/asf/reef/blob/a304daac/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
index 56add09..3f452c4 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
@@ -76,12 +76,13 @@ namespace Org.Apache.REEF.Evaluator.Tests
         {
             var injector = GetInjector();
             var taskRuntime = injector.GetInstance<TaskRuntime>();
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
             var task = injector.GetInstance<TestTask>();
             task.FinishCountdownEvent.Wait();
             task.DisposeCountdownEvent.Wait();
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done);
             Assert.True(taskRuntime.HasEnded());
+            taskThread.Join();
         }
 
         /// <summary>
@@ -92,11 +93,12 @@ namespace Org.Apache.REEF.Evaluator.Tests
         {
             var injector = GetInjector(typeof(ExceptionAction));
             var taskRuntime = injector.GetInstance<TaskRuntime>();
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
             var task = injector.GetInstance<TestTask>();
             task.DisposeCountdownEvent.Wait();
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed);
             Assert.True(taskRuntime.HasEnded());
+            taskThread.Join();
         }
 
         /// <summary>
@@ -118,7 +120,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
             Assert.Equal(statusProto.state, State.INIT);
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Init);
 
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
+
             Assert.Equal(taskRuntime.GetStatusProto().state, State.RUNNING);
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Running);
 
@@ -138,6 +141,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             Assert.Equal(taskRuntime.GetStatusProto().state, State.DONE);
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done);
+
+            taskThread.Join();
         }
 
         /// <summary>
@@ -163,7 +168,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 throw new Exception("Event handler is not expected to be 
null.");
             }
 
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
 
             Assert.True(testTaskEventStartHandler.StartInvoked.IsPresent());
             Assert.Equal(testTaskEventStartHandler.StartInvoked.Value, taskId);
@@ -190,6 +195,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
             Assert.True(ReferenceEquals(testTaskEventStartHandler, 
testTaskEventStopHandler));
             Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent());
             Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId);
+
+            taskThread.Join();
         }
 
         /// <summary>
@@ -205,7 +212,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
             var injector = GetInjector(typeof(ExceptionAction), contextId, 
taskId);
             var taskRuntime = injector.GetInstance<TaskRuntime>();
 
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
 
             var task = injector.GetInstance<TestTask>();
             task.FinishCountdownEvent.Wait();
@@ -221,6 +228,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent());
             Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId);
+
+            taskThread.Join();
         }
 
         /// <summary>
@@ -234,7 +243,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             var injector = GetInjector(typeof(CountDownAction), contextId, 
taskId);
             var taskRuntime = injector.GetInstance<TaskRuntime>();
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
 
             var taskInterface = injector.GetInstance<ITask>();
             Assert.True(taskInterface is TestTask);
@@ -251,6 +260,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
             task.DisposeCountdownEvent.Wait();
 
             Assert.True(task.SuspendInvoked);
+
+            taskThread.Join();
         }
 
         /// <summary>
@@ -264,7 +275,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             var injector = GetInjector(contextId, taskId);
             var taskRuntime = injector.GetInstance<TaskRuntime>();
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
 
             var taskInterface = injector.GetInstance<ITask>();
             Assert.True(taskInterface is TestTask);
@@ -290,6 +301,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             taskRuntime.Suspend(null);
             Assert.False(task.SuspendInvoked);
+
+            taskThread.Join();
         }
 
         /// <summary>
@@ -303,7 +316,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             var injector = GetInjector(typeof(ExceptionAction), contextId, 
taskId);
             var taskRuntime = injector.GetInstance<TaskRuntime>();
-            taskRuntime.RunTask();
+            var taskThread = taskRuntime.RunTask();
 
             var task = injector.GetInstance<TestTask>();
 
@@ -322,6 +335,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             taskRuntime.Suspend(null);
             Assert.False(task.SuspendInvoked);
+
+            taskThread.Join();
         }
 
         private static IInjector GetInjector(string contextId = "contextId", 
string taskId = "taskId")

Reply via email to