Repository: reef
Updated Branches:
  refs/heads/master 895bcfa5c -> c750edf9e


[REEF-1257] Add TaskCloseHandler in TaskRunTime

  * Inject TaskCloseHandler in TaskRunTime
  * Handle Close event
  * Add DefaultCloseTaskHandler
  * Updated test cases

JIRA:
  [REEF-1257](https://issues.apache.org/jira/browse/REEF-1257)

Pull Request:
  This closes #889


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c750edf9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c750edf9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c750edf9

Branch: refs/heads/master
Commit: c750edf9e15b7608b1d3e5190c84093b646f4de7
Parents: 895bcfa
Author: Julia Wang <[email protected]>
Authored: Tue Mar 15 19:01:34 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Mar 18 16:25:34 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   2 +
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  16 +-
 .../Tasks/Defaults/DefaultTaskCloseHandler.cs   |  56 +++++
 .../TaskCloseHandlerNotBoundException.cs        |  43 ++++
 .../Tasks/TaskConfigurationOptions.cs           |   2 +-
 .../Functional/Bridge/TestCloseTask.cs          | 224 +++++++++++++++++--
 6 files changed, 318 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index dcb0c1f..307c558 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -158,6 +158,7 @@ under the License.
     <Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" />
     <Compile Include="runtime\MachineStatus.cs" />
     <Compile Include="Services\ServiceConfiguration.cs" />
+    <Compile Include="Tasks\Defaults\DefaultTaskCloseHandler.cs" />
     <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" 
/>
     <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" />
     <Compile Include="Tasks\Defaults\DefaultSuspendHandler.cs" />
@@ -169,6 +170,7 @@ under the License.
     <Compile Include="Tasks\Events\ISuspendEvent.cs" />
     <Compile Include="Tasks\Events\ITaskStart.cs" />
     <Compile Include="Tasks\Events\ITaskStop.cs" />
+    <Compile Include="Tasks\Exceptions\TaskCloseHandlerNotBoundException.cs" />
     <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" />
     <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" />
     <Compile Include="Tasks\IDriverMessageHandler.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index dffae22..5cc22ec 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -16,17 +16,15 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.Globalization;
+using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Common.Tasks.Exceptions;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
-using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
@@ -42,6 +40,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         private readonly Optional<IDriverMessageHandler> _driverMessageHandler;
         private readonly ITask _userTask;
         private readonly IInjectionFuture<IObserver<ISuspendEvent>> 
_suspendHandlerFuture;
+        private readonly IInjectionFuture<IObserver<ICloseEvent>> 
_closeHandlerFuture;
         private int _taskRan = 0;
 
         [Inject]
@@ -50,13 +49,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             IDriverMessageHandler driverMessageHandler, 
             IDriverConnectionMessageHandler driverConnectionMessageHandler,
             TaskStatus taskStatus,
-            [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] 
IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture)
+            [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] 
IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture,
+            [Parameter(typeof(TaskConfigurationOptions.CloseHandler))] 
IInjectionFuture<IObserver<ICloseEvent>> closedHandlerFuture)
         {
             _currentStatus = taskStatus;
             _driverMessageHandler = 
Optional<IDriverMessageHandler>.Of(driverMessageHandler);
             _driverConnectionMessageHandler = 
Optional<IDriverConnectionMessageHandler>.Of(driverConnectionMessageHandler);
             _userTask = userTask;
             _suspendHandlerFuture = suspendHandlerFuture;
+            _closeHandlerFuture = closedHandlerFuture;
         }
 
         public string TaskId
@@ -159,6 +160,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         public void Close(byte[] message)
         {
             Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to close Task {0}", TaskId));
+
             if (_currentStatus.IsNotRunning())
             {
                 Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in 
{0} state. Ignored.", _currentStatus.State));
@@ -182,7 +184,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
             if (_currentStatus.IsNotRunning())
             {
-                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is 
in {0} state. Ignored.", _currentStatus.State));
+                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to suspend an task that is 
in {0} state. Ignored.", _currentStatus.State));
                 return;
             }
             try
@@ -220,7 +222,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         public void OnNext(ICloseEvent value)
         {
             Logger.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
-            //// TODO: send a heartbeat
+            _closeHandlerFuture.Get().OnNext(value);
+
+            // TODO: send a heartbeat
         }
 
         public void OnNext(ISuspendEvent value)

http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs
new file mode 100644
index 0000000..a497b22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Common.Tasks.Exceptions;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Tasks.Defaults
+{
+    /// <summary>
+    /// A default handler for an event from the driver signaling the suspension
+    /// of a task. Throws an exception by default, since a task should not 
have received
+    /// a suspension event if the handler is not bound explicitly.
+    /// </summary>
+    public sealed class DefaultTaskCloseHandler : IObserver<ICloseEvent>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(DefaultTaskCloseHandler));
+        public const string ExceptionMessage = "No EventHandler<CloseEvent> 
registered. Event received.";
+
+        [Inject]
+        private DefaultTaskCloseHandler()
+        {
+        }
+
+        public void OnCompleted()
+        {
+            Utilities.Diagnostics.Exceptions.Throw(new 
NotImplementedException(), Logger);
+        }
+
+        public void OnError(Exception error)
+        {
+            Utilities.Diagnostics.Exceptions.Throw(new 
NotImplementedException(), Logger);
+        }
+
+        public void OnNext(ICloseEvent value)
+        {
+            Utilities.Diagnostics.Exceptions.Throw(new 
TaskCloseHandlerNotBoundException(ExceptionMessage + value), Logger);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs
new file mode 100644
index 0000000..7dd94a8
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Tasks.Exceptions
+{
+    /// <summary>
+    /// An exception that is thrown when the task close event
+    /// handler is not bound.
+    /// </summary>
+    internal sealed class TaskCloseHandlerNotBoundException : Exception
+    {
+        internal TaskCloseHandlerNotBoundException(string message)
+            : base(message)
+        {
+        }
+
+        internal TaskCloseHandlerNotBoundException(Exception innerException)
+            : base(innerException.Message, innerException)
+        {
+        }
+
+        internal TaskCloseHandlerNotBoundException(string message, Exception 
innerException)
+            : base(message, innerException)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
index 30dce1a..d5e2471 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Common.Tasks
         {
         }
 
-        [NamedParameter(documentation: "The event handler that receives the 
close event.")]
+        [NamedParameter(documentation: "The event handler that receives the 
close event.", defaultClass: typeof(DefaultTaskCloseHandler))]
         public class CloseHandler : Name<IObserver<ICloseEvent>>
         {
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index 9cc3cd3..3bf553b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -21,15 +21,19 @@ using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Defaults;
+using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Logging;
 using Xunit;
 
@@ -45,6 +49,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         private const string DisposeMessageFromDriver = 
"DisposeMessageFromDriver";
         private const string NoMessage = "NO_MESSAGE";
         private const string CompletedValidationMessage = 
"CompletedValidationmessage";
+        private const string FailToCloseTaskMessage = "FailToCloseTaskMessage";
 
         public TestCloseTask()
         {
@@ -52,29 +57,69 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
-        /// This test is to close a running task over the bridge
+        /// This test is close a running task with a close handler registered
         /// </summary>
         [Fact]
         public void TestStopTaskOnLocalRuntime()
         {
             string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver), 
typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForCloseTask()), typeof(CloseTaskTestDriver), 1, 
"testStopTask", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
             
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 1);
             var messages = new List<string>();
             messages.Add(DisposeMessageFromDriver);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, 2);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is to close a running task with exception throw in close 
handler
+        /// Expect to receive Exception in Failed Task event handler in driver
+        /// </summary>
+        [Fact]
+        public void TestStopTaskWithExceptionOnLocalRuntime()
+        {
+            const string successIndication = "EXIT: 
ActiveContextClr2Java::Close";
+            const string failedTaskIndication = 
"Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
+
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForFailToCloseTask()), typeof(CloseTaskTestDriver), 1, 
"testStopTask", "local", testFolder);
+            var messages = new List<string>();
+            messages.Add(successIndication);
+            messages.Add(failedTaskIndication);
+            ValidateMessageSuccessfullyLogged(messages, "driver", 
DriverStdout, testFolder, 1);
+
+            var messages1 = new List<string>();
+            messages1.Add(DisposeMessageFromDriver);
+            ValidateMessageSuccessfullyLogged(messages1, "Node-*", 
EvaluatorStdout, testFolder, 2);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is to close a running task over the bridge without close 
handler bound
+        /// Expect to get TaskCloseHandlerNotBoundException
+        /// </summary>
+        [Fact]
+        public void TestTaskWithNoCloseHandlerOnLocalRuntime()
+        {
+            const string closeHandlerNoBound = "ExceptionCaught 
TaskCloseHandlerNotBoundException";
+
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForNoCloseHandlerTask()), typeof(CloseTaskTestDriver), 1, 
"testStopTask", "local", testFolder);
+            var messages = new List<string>();
+            messages.Add(closeHandlerNoBound);
             ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, 1);
             CleanUp(testFolder);
         }
 
         /// <summary>
-        /// This test is to close a running task over the bridge
+        /// This test is to close a running task over the bridge with null 
message
         /// </summary>
         [Fact]
         public void TestStopTaskOnLocalRuntimeWithNullMessage()
         {
             string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(NoMessage), 
typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+            TestRun(DriverConfigurations(NoMessage, 
GetTaskConfigurationForCloseTask()), typeof(CloseTaskTestDriver), 1, 
"testStopTask", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
             
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 1);
             var messages = new List<string>();
@@ -83,11 +128,37 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
         }
 
+        private IConfiguration GetTaskConfigurationForCloseTask()
+        {
+            return TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseTestTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseTestTask>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetTaskConfigurationForFailToCloseTask()
+        {
+            return TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID-FailToClose")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.FailToCloseTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.FailToCloseTask>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetTaskConfigurationForNoCloseHandlerTask()
+        {
+            return TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID-NoCloseHandler")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.NoCloseHandlerTask>.Class)
+                .Build();
+        }
+
         /// <summary>
         /// Driver configuration for the test driver
         /// </summary>
         /// <returns></returns>
-        public IConfiguration DriverConfigurations(string taskCloseMessage)
+        public IConfiguration DriverConfigurations(string taskCloseMessage, 
IConfiguration taskConfig)
         {
             var handlerConfig = DriverConfiguration.ConfigurationModule
                 .Set(DriverConfiguration.OnDriverStarted, 
GenericType<CloseTaskTestDriver>.Class)
@@ -95,10 +166,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Set(DriverConfiguration.OnContextActive, 
GenericType<CloseTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskRunning, 
GenericType<CloseTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<CloseTaskTestDriver>.Class)
                 .Build();
 
+            AvroConfigurationSerializer serializer = new 
AvroConfigurationSerializer();
             var messageConfig = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindStringNamedParam<DisposeMessage>(taskCloseMessage)
+                
.BindStringNamedParam<TaskConfigurationString>(serializer.ToString(taskConfig))
                 .Build();
 
             return Configurations.Merge(handlerConfig, messageConfig);
@@ -109,24 +183,33 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
         }
 
+        [NamedParameter("Task Configuration string", 
"TaskConfigurationString")]
+        private class TaskConfigurationString : Name<string>
+        {
+        }
+
         private sealed class CloseTaskTestDriver :
             IObserver<IDriverStarted>,
             IObserver<IAllocatedEvaluator>,
             IObserver<IActiveContext>,
             IObserver<ICompletedTask>,
+            IObserver<IFailedTask>,
             IObserver<IRunningTask>           
         {
             private readonly IEvaluatorRequestor _requestor;
             private int _contextNumber = 0;
-            private int _taskNumber = 0;
             private string _disposeMessage;
+            private IConfiguration _taskConfiguration;
 
             [Inject]
             private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor,
-                [Parameter(typeof(DisposeMessage))] string disposeMessage)
+                [Parameter(typeof(DisposeMessage))] string disposeMessage,
+                [Parameter(typeof(TaskConfigurationString))] string 
taskConfigString,
+                AvroConfigurationSerializer avroConfigurationSerializer)
             {
                 _requestor = evaluatorRequestor;
                 _disposeMessage = disposeMessage;
+                _taskConfiguration = 
avroConfigurationSerializer.FromString(taskConfigString);
             }
 
             public void OnNext(IDriverStarted value)
@@ -144,16 +227,33 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
 
             public void OnNext(IActiveContext value)
             {
-                value.SubmitTask(GetTaskConfiguration());
+                value.SubmitTask(_taskConfiguration);
             }
 
             public void OnNext(ICompletedTask value)
             {
                 // Log on task completion to signal a passed test.
-                Logger.Log(Level.Info, CompletedValidationMessage + "Task 
completed: " + value.Id);
+                Logger.Log(Level.Info, CompletedValidationMessage + ". Task 
completed: " + value.Id);
                 value.ActiveContext.Dispose();
             }
 
+            public void OnNext(IFailedTask value)
+            {
+                var failedExeption = 
ByteUtilities.ByteArraysToString(value.Data.Value);
+                Logger.Log(Level.Error, "In IFailedTask: " + failedExeption);
+
+                if (value.Id.EndsWith("TaskID-FailToClose"))
+                {
+                    Assert.Contains(FailToCloseTaskMessage, failedExeption);
+                }
+                if (value.Id.EndsWith("TaskID-NoCloseHandler"))
+                {
+                    Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, 
failedExeption);
+                }
+                
+                value.GetActiveContext().Value.Dispose();
+            }
+
             public void OnNext(IRunningTask value)
             {
                 Logger.Log(Level.Info, "Task running: " + value.Id);
@@ -167,12 +267,100 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 }
             }
 
-            private IConfiguration GetTaskConfiguration()
+            public void OnCompleted()
             {
-                return TaskConfiguration.ConfigurationModule
-                    .Set(TaskConfiguration.Identifier, "TaskID" + 
_taskNumber++)
-                    .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.StopTestTask>.Class)
-                    .Build();
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class CloseTestTask : ITask, IObserver<ICloseEvent>
+        {
+            private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
+
+            [Inject]
+            private CloseTestTask()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Info, "Hello in StopTestTask");
+                _suspendSignal.Wait();
+                return null;
+            }
+
+            public void Dispose()
+            {
+                Logger.Log(Level.Info, "Task is disposed.");
+            }
+
+            public void OnNext(ICloseEvent value)
+            {
+                try
+                {
+                    if (value.Value != null && value.Value.Value != null)
+                    {
+                        Logger.Log(Level.Info, "Closed event received in 
task:" + Encoding.UTF8.GetString(value.Value.Value));
+                        
Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), 
DisposeMessageFromDriver);
+                    }                    
+                }
+                finally
+                {
+                    _suspendSignal.Signal();
+                }
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class FailToCloseTask : ITask, IObserver<ICloseEvent>
+        {
+            private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
+
+            [Inject]
+            private FailToCloseTask()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Info, "Hello in FailtToCloseTask");
+                _suspendSignal.Wait();
+                return null;
+            }
+
+            public void Dispose()
+            {
+                Logger.Log(Level.Info, "Task is disposed.");
+            }
+
+            public void OnNext(ICloseEvent value)
+            {
+                try
+                {
+                    if (value.Value != null && value.Value.Value != null)
+                    {
+                        Logger.Log(Level.Info, "Closed event received in 
task:" + Encoding.UTF8.GetString(value.Value.Value));
+                        
Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), 
DisposeMessageFromDriver);
+                    }
+                }
+                finally
+                {
+                    throw new Exception(FailToCloseTaskMessage);
+                }
             }
 
             public void OnCompleted()
@@ -186,17 +374,17 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        private sealed class StopTestTask : ITask
+        private sealed class NoCloseHandlerTask : ITask
         {
             [Inject]
-            private StopTestTask()
+            private NoCloseHandlerTask()
             {
             }
 
             public byte[] Call(byte[] memento)
             {
-                // TODO[REEF-1257]
-                Thread.Sleep(5 * 1000);
+                Logger.Log(Level.Info, "Hello in NoCloseHandlerTask");
+                Thread.Sleep(50 * 1000);
                 return null;
             }
 

Reply via email to