Repository: reef
Updated Branches:
  refs/heads/master b5386f77e -> b45c29c5e


[REEF-1209] Implement SuspendTask on the Evaluator

JIRA:
  [REEF-1209](https://issues.apache.org/jira/browse/REEF-1209)

This closes #847


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b45c29c5
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b45c29c5
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b45c29c5

Branch: refs/heads/master
Commit: b45c29c5e14c55781ee0e22627985e5626db8e23
Parents: b5386f7
Author: Andrew Chung <[email protected]>
Authored: Thu Feb 18 15:26:48 2016 -0800
Committer: Julia Wang <[email protected]>
Committed: Wed Feb 24 14:20:35 2016 -0800

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   2 +
 .../Runtime/Evaluator/Task/SuspendEventImpl.cs  |  24 +--
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  17 ++-
 .../Tasks/Defaults/DefaultSuspendHandler.cs     |  51 +++++++
 .../Tasks/Events/ISuspendEvent.cs               |   9 ++
 .../Exceptions/TaskSuspendHandlerException.cs   |  43 ++++++
 .../Tasks/TaskConfigurationOptions.cs           |   3 +-
 .../TaskRuntimeTests.cs                         | 150 ++++++++++++++++++-
 8 files changed, 276 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 22f5408..de1663f 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -160,6 +160,7 @@ under the License.
     <Compile Include="Services\ServiceConfiguration.cs" />
     <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" 
/>
     <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" />
+    <Compile Include="Tasks\Defaults\DefaultSuspendHandler.cs" />
     <Compile Include="Tasks\Defaults\DefaultTaskMessageSource.cs" />
     <Compile Include="Tasks\DriverConnectionState.cs" />
     <Compile Include="Tasks\DriverConnectionMessageImpl.cs" />
@@ -168,6 +169,7 @@ under the License.
     <Compile Include="Tasks\Events\ISuspendEvent.cs" />
     <Compile Include="Tasks\Events\ITaskStart.cs" />
     <Compile Include="Tasks\Events\ITaskStop.cs" />
+    <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" />
     <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" />
     <Compile Include="Tasks\IDriverMessageHandler.cs" />
     <Compile Include="Tasks\IDriverConnectionMessage.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
index 2157d46..5033ecf 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
@@ -20,27 +20,17 @@ using Org.Apache.REEF.Utilities;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 {
-    internal sealed class SuspendEventImpl : ICloseEvent
+    /// <summary>
+    /// The implementation of an event from the driver signaling 
+    /// that a task should be suspended.
+    /// </summary>
+    internal sealed class SuspendEventImpl : ISuspendEvent
     {
-        public SuspendEventImpl()
-        {
-            Value = Optional<byte[]>.Empty();
-        }
-
         public SuspendEventImpl(byte[] bytes)
         {
-            Value = Optional<byte[]>.OfNullable(bytes);
-        }
-
-        public Optional<byte[]> Value
-        {
-            get { return Value; }
-            set { value = Value; }
+            Message = Optional<byte[]>.OfNullable(bytes);
         }
 
-        public override string ToString()
-        {
-            return "SuspendEvent{value=" + Value.ToString() + "}";
-        }
+        public Optional<byte[]> Message { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 6fa135b..ca86f32 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -22,8 +22,10 @@ using System.Threading;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Common.Tasks.Exceptions;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Attributes;
@@ -39,6 +41,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         private readonly Optional<IDriverConnectionMessageHandler> 
_driverConnectionMessageHandler;
         private readonly Optional<IDriverMessageHandler> _driverMessageHandler;
         private readonly ITask _userTask;
+        private readonly IInjectionFuture<IObserver<ISuspendEvent>> 
_suspendHandlerFuture;
         private int _taskRan = 0;
 
         [Inject]
@@ -46,12 +49,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             ITask userTask,
             IDriverMessageHandler driverMessageHandler, 
             IDriverConnectionMessageHandler driverConnectionMessageHandler,
-            TaskStatus taskStatus)
+            TaskStatus taskStatus,
+            [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] 
IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture)
         {
             _currentStatus = taskStatus;
             _driverMessageHandler = 
Optional<IDriverMessageHandler>.Of(driverMessageHandler);
             _driverConnectionMessageHandler = 
Optional<IDriverConnectionMessageHandler>.Of(driverConnectionMessageHandler);
             _userTask = userTask;
+            _suspendHandlerFuture = suspendHandlerFuture;
         }
 
         /// <summary>
@@ -274,7 +279,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         public void OnNext(ISuspendEvent value)
         {
             Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
-            //// TODO: send a heartbeat
+            try
+            {
+                _suspendHandlerFuture.Get().OnNext(value);
+            }
+            catch (Exception ex)
+            {
+                var suspendEx = new TaskSuspendHandlerException("Unable to 
suspend task.", ex);
+                Utilities.Diagnostics.Exceptions.CaughtAndThrow(suspendEx, 
Level.Error, Logger);
+            }
         }
 
         public void OnNext(IDriverMessage value)

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs
new file mode 100644
index 0000000..18bc5c1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Tasks.Defaults
+{
+    /// <summary>
+    /// A default handler for an event from the driver signaling the suspension
+    /// of a task. Throws an exception by default, since a task should not 
have received
+    /// a suspension event if the handler is not bound explicitly.
+    /// </summary>
+    internal sealed class DefaultSuspendHandler : IObserver<ISuspendEvent>
+    {
+        [Inject]
+        private DefaultSuspendHandler()
+        {
+        }
+
+        public void OnNext(ISuspendEvent value)
+        {
+            throw new Exception("No handler for suspend registered.");
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
index 5fef335..2ad9326 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
@@ -15,9 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Org.Apache.REEF.Utilities;
+
 namespace Org.Apache.REEF.Common.Tasks.Events
 {
+    /// <summary>
+    /// An event from the driver signaling that a task should be suspended.
+    /// </summary>
     public interface ISuspendEvent
     {
+        /// <summary>
+        /// The optional suspension message from the driver.
+        /// </summary>
+        Optional<byte[]> Message { get; } 
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
new file mode 100644
index 0000000..717c399
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Tasks.Exceptions
+{
+    /// <summary>
+    /// An exception that is thrown when the task suspension event
+    /// handler is not bound.
+    /// </summary>
+    internal sealed class TaskSuspendHandlerException : Exception
+    {
+        public TaskSuspendHandlerException(string message)
+            : base(message)
+        {
+        }
+
+        public TaskSuspendHandlerException(Exception innerException)
+            : base(innerException.Message, innerException)
+        {
+        }
+
+        public TaskSuspendHandlerException(string message, Exception 
innerException)
+            : base(message, innerException)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
index 3842fb7..30dce1a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks.Defaults;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Tang.Annotations;
 
@@ -54,7 +55,7 @@ namespace Org.Apache.REEF.Common.Tasks
         {
         }
 
-        [NamedParameter(documentation: "The event handler that receives the 
suspend event.")]
+        [NamedParameter(documentation: "The event handler that receives the 
suspend event.", defaultClass: typeof(DefaultSuspendHandler))]
         public class SuspendHandler : Name<IObserver<ISuspendEvent>>
         {
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
index 0435357..56add09 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Common.Runtime.Evaluator;
 using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
@@ -123,7 +124,15 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
             injector.GetInstance<CountDownAction>().CountdownEvent.Signal();
 
-            var task = injector.GetInstance<TestTask>();
+            var taskInterface = injector.GetInstance<ITask>();
+            Assert.True(taskInterface is TestTask);
+
+            var task = taskInterface as TestTask;
+            if (task == null)
+            {
+                throw new Exception("Task is expected to be an instance of 
TestTask.");
+            }
+
             task.FinishCountdownEvent.Wait();
             task.DisposeCountdownEvent.Wait();
 
@@ -214,6 +223,107 @@ namespace Org.Apache.REEF.Evaluator.Tests
             Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId);
         }
 
+        /// <summary>
+        /// Tests that suspend ends the task and invokes the right handler.
+        /// </summary>
+        [Fact]
+        public void TestSuspendTask()
+        {
+            var contextId = Guid.NewGuid().ToString();
+            var taskId = Guid.NewGuid().ToString();
+
+            var injector = GetInjector(typeof(CountDownAction), contextId, 
taskId);
+            var taskRuntime = injector.GetInstance<TaskRuntime>();
+            taskRuntime.RunTask();
+
+            var taskInterface = injector.GetInstance<ITask>();
+            Assert.True(taskInterface is TestTask);
+
+            var task = taskInterface as TestTask;
+            if (task == null)
+            {
+                throw new Exception("Task is expected to be an instance of 
TestTask.");
+            }
+
+            taskRuntime.Suspend(null);
+
+            task.FinishCountdownEvent.Wait();
+            task.DisposeCountdownEvent.Wait();
+
+            Assert.True(task.SuspendInvoked);
+        }
+
+        /// <summary>
+        /// Tests that suspend is not invoked after task is done.
+        /// </summary>
+        [Fact]
+        public void TestSuspendTaskAfterDoneIsNotSuspended()
+        {
+            var contextId = Guid.NewGuid().ToString();
+            var taskId = Guid.NewGuid().ToString();
+
+            var injector = GetInjector(contextId, taskId);
+            var taskRuntime = injector.GetInstance<TaskRuntime>();
+            taskRuntime.RunTask();
+
+            var taskInterface = injector.GetInstance<ITask>();
+            Assert.True(taskInterface is TestTask);
+
+            var task = taskInterface as TestTask;
+            if (task == null)
+            {
+                throw new Exception("Task is expected to be an instance of 
TestTask.");
+            }
+
+            task.FinishCountdownEvent.Wait();
+            task.DisposeCountdownEvent.Wait();
+
+            var stopHandlers = 
injector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, 
ISet<IObserver<ITaskStop>>>();
+
+            var testTaskEventStopHandler = stopHandlers.Single() as 
TestTaskEventHandler;
+            if (testTaskEventStopHandler == null)
+            {
+                throw new Exception("Event handler is not expected to be 
null.");
+            }
+
+            Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId);
+
+            taskRuntime.Suspend(null);
+            Assert.False(task.SuspendInvoked);
+        }
+
+        /// <summary>
+        /// Tests that suspend is not invoked after task is done.
+        /// </summary>
+        [Fact]
+        public void TestSuspendTaskAfterFailureIsNotSuspended()
+        {
+            var contextId = Guid.NewGuid().ToString();
+            var taskId = Guid.NewGuid().ToString();
+
+            var injector = GetInjector(typeof(ExceptionAction), contextId, 
taskId);
+            var taskRuntime = injector.GetInstance<TaskRuntime>();
+            taskRuntime.RunTask();
+
+            var task = injector.GetInstance<TestTask>();
+
+            task.DisposeCountdownEvent.Wait();
+
+            var stopHandlers = 
injector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, 
ISet<IObserver<ITaskStop>>>();
+
+            var testTaskEventStopHandler = stopHandlers.Single() as 
TestTaskEventHandler;
+            if (testTaskEventStopHandler == null)
+            {
+                throw new Exception("Event handler is not expected to be 
null.");
+            }
+
+            Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent());
+            Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed);
+
+            taskRuntime.Suspend(null);
+            Assert.False(task.SuspendInvoked);
+        }
+
         private static IInjector GetInjector(string contextId = "contextId", 
string taskId = "taskId")
         {
             return GetInjector(typeof(DefaultAction), contextId, taskId);
@@ -233,6 +343,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 .Set(TaskConfiguration.OnTaskStart, 
GenericType<TestTaskEventHandler>.Class)
                 .Set(TaskConfiguration.OnTaskStop, 
GenericType<TestTaskEventHandler>.Class)
                 .Set(TaskConfiguration.Task, GenericType<TestTask>.Class)
+                .Set(TaskConfiguration.OnSuspend, GenericType<TestTask>.Class)
                 .Build();
             
             var actionConfig = confBuilder
@@ -279,7 +390,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
             }
         }
 
-        private sealed class TestTask : ITask
+        private sealed class TestTask : ITask, IObserver<ISuspendEvent>
         {
             private readonly IAction _action;
 
@@ -291,6 +402,8 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 _action = action;
             }
 
+            public bool SuspendInvoked { get; private set; }
+
             public CountdownEvent FinishCountdownEvent { get; private set; }
 
             public CountdownEvent DisposeCountdownEvent { get; private set; }
@@ -313,11 +426,29 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
                 return null;
             }
+
+            public void OnNext(ISuspendEvent value)
+            {
+                _action.OnSuspend();
+                SuspendInvoked = true;
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
         }
 
         private interface IAction
         {
-            Action Value { get; } 
+            Action Value { get; }
+
+            void OnSuspend();
         }
 
         private sealed class DefaultAction : IAction
@@ -335,6 +466,10 @@ namespace Org.Apache.REEF.Evaluator.Tests
                     return () => { };
                 }
             }
+
+            public void OnSuspend()
+            {
+            }
         }
 
         private sealed class ExceptionAction : IAction
@@ -354,6 +489,10 @@ namespace Org.Apache.REEF.Evaluator.Tests
                     };
                 } 
             }
+
+            public void OnSuspend()
+            {
+            }
         }
 
         private sealed class CountDownAction : IAction
@@ -375,6 +514,11 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 }
             }
 
+            public void OnSuspend()
+            {
+                CountdownEvent.Signal();
+            }
+
             public CountdownEvent CountdownEvent { get; private set; }
         }
     }

Reply via email to