Repository: reef Updated Branches: refs/heads/master 895bcfa5c -> c750edf9e
[REEF-1257] Add TaskCloseHandler in TaskRunTime * Inject TaskCloseHandler in TaskRunTime * Handle Close event * Add DefaultCloseTaskHandler * Updated test cases JIRA: [REEF-1257](https://issues.apache.org/jira/browse/REEF-1257) Pull Request: This closes #889 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c750edf9 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c750edf9 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c750edf9 Branch: refs/heads/master Commit: c750edf9e15b7608b1d3e5190c84093b646f4de7 Parents: 895bcfa Author: Julia Wang <[email protected]> Authored: Tue Mar 15 19:01:34 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Mar 18 16:25:34 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 2 + .../Runtime/Evaluator/Task/TaskRuntime.cs | 16 +- .../Tasks/Defaults/DefaultTaskCloseHandler.cs | 56 +++++ .../TaskCloseHandlerNotBoundException.cs | 43 ++++ .../Tasks/TaskConfigurationOptions.cs | 2 +- .../Functional/Bridge/TestCloseTask.cs | 224 +++++++++++++++++-- 6 files changed, 318 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index dcb0c1f..307c558 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -158,6 +158,7 @@ under the License. <Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" /> <Compile Include="runtime\MachineStatus.cs" /> <Compile Include="Services\ServiceConfiguration.cs" /> + <Compile Include="Tasks\Defaults\DefaultTaskCloseHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultSuspendHandler.cs" /> @@ -169,6 +170,7 @@ under the License. <Compile Include="Tasks\Events\ISuspendEvent.cs" /> <Compile Include="Tasks\Events\ITaskStart.cs" /> <Compile Include="Tasks\Events\ITaskStop.cs" /> + <Compile Include="Tasks\Exceptions\TaskCloseHandlerNotBoundException.cs" /> <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" /> <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\IDriverMessageHandler.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index dffae22..5cc22ec 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -16,17 +16,15 @@ // under the License. using System; -using System.Collections.Generic; using System.Globalization; +using System.Text; using System.Threading; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Common.Tasks.Exceptions; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -42,6 +40,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private readonly Optional<IDriverMessageHandler> _driverMessageHandler; private readonly ITask _userTask; private readonly IInjectionFuture<IObserver<ISuspendEvent>> _suspendHandlerFuture; + private readonly IInjectionFuture<IObserver<ICloseEvent>> _closeHandlerFuture; private int _taskRan = 0; [Inject] @@ -50,13 +49,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task IDriverMessageHandler driverMessageHandler, IDriverConnectionMessageHandler driverConnectionMessageHandler, TaskStatus taskStatus, - [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture) + [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture, + [Parameter(typeof(TaskConfigurationOptions.CloseHandler))] IInjectionFuture<IObserver<ICloseEvent>> closedHandlerFuture) { _currentStatus = taskStatus; _driverMessageHandler = Optional<IDriverMessageHandler>.Of(driverMessageHandler); _driverConnectionMessageHandler = Optional<IDriverConnectionMessageHandler>.Of(driverConnectionMessageHandler); _userTask = userTask; _suspendHandlerFuture = suspendHandlerFuture; + _closeHandlerFuture = closedHandlerFuture; } public string TaskId @@ -159,6 +160,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void Close(byte[] message) { Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + if (_currentStatus.IsNotRunning()) { Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); @@ -182,7 +184,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task if (_currentStatus.IsNotRunning()) { - Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to suspend an task that is in {0} state. Ignored.", _currentStatus.State)); return; } try @@ -220,7 +222,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void OnNext(ICloseEvent value) { Logger.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); - //// TODO: send a heartbeat + _closeHandlerFuture.Get().OnNext(value); + + // TODO: send a heartbeat } public void OnNext(ISuspendEvent value) http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs new file mode 100644 index 0000000..a497b22 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskCloseHandler.cs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Common.Tasks.Exceptions; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Tasks.Defaults +{ + /// <summary> + /// A default handler for an event from the driver signaling the suspension + /// of a task. Throws an exception by default, since a task should not have received + /// a suspension event if the handler is not bound explicitly. + /// </summary> + public sealed class DefaultTaskCloseHandler : IObserver<ICloseEvent> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultTaskCloseHandler)); + public const string ExceptionMessage = "No EventHandler<CloseEvent> registered. Event received."; + + [Inject] + private DefaultTaskCloseHandler() + { + } + + public void OnCompleted() + { + Utilities.Diagnostics.Exceptions.Throw(new NotImplementedException(), Logger); + } + + public void OnError(Exception error) + { + Utilities.Diagnostics.Exceptions.Throw(new NotImplementedException(), Logger); + } + + public void OnNext(ICloseEvent value) + { + Utilities.Diagnostics.Exceptions.Throw(new TaskCloseHandlerNotBoundException(ExceptionMessage + value), Logger); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs new file mode 100644 index 0000000..7dd94a8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskCloseHandlerNotBoundException.cs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Common.Tasks.Exceptions +{ + /// <summary> + /// An exception that is thrown when the task close event + /// handler is not bound. + /// </summary> + internal sealed class TaskCloseHandlerNotBoundException : Exception + { + internal TaskCloseHandlerNotBoundException(string message) + : base(message) + { + } + + internal TaskCloseHandlerNotBoundException(Exception innerException) + : base(innerException.Message, innerException) + { + } + + internal TaskCloseHandlerNotBoundException(string message, Exception innerException) + : base(message, innerException) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs index 30dce1a..d5e2471 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs @@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Common.Tasks { } - [NamedParameter(documentation: "The event handler that receives the close event.")] + [NamedParameter(documentation: "The event handler that receives the close event.", defaultClass: typeof(DefaultTaskCloseHandler))] public class CloseHandler : Name<IObserver<ICloseEvent>> { } http://git-wip-us.apache.org/repos/asf/reef/blob/c750edf9/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs index 9cc3cd3..3bf553b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs @@ -21,15 +21,19 @@ using System.Text; using System.Threading; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Defaults; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; using Xunit; @@ -45,6 +49,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge private const string DisposeMessageFromDriver = "DisposeMessageFromDriver"; private const string NoMessage = "NO_MESSAGE"; private const string CompletedValidationMessage = "CompletedValidationmessage"; + private const string FailToCloseTaskMessage = "FailToCloseTaskMessage"; public TestCloseTask() { @@ -52,29 +57,69 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } /// <summary> - /// This test is to close a running task over the bridge + /// This test is close a running task with a close handler registered /// </summary> [Fact] public void TestStopTaskOnLocalRuntime() { string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); - TestRun(DriverConfigurations(DisposeMessageFromDriver), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForCloseTask()), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); ValidateSuccessForLocalRuntime(1, testFolder: testFolder); ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1); var messages = new List<string>(); messages.Add(DisposeMessageFromDriver); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 2); + CleanUp(testFolder); + } + + /// <summary> + /// This test is to close a running task with exception throw in close handler + /// Expect to receive Exception in Failed Task event handler in driver + /// </summary> + [Fact] + public void TestStopTaskWithExceptionOnLocalRuntime() + { + const string successIndication = "EXIT: ActiveContextClr2Java::Close"; + const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext"; + + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForFailToCloseTask()), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + var messages = new List<string>(); + messages.Add(successIndication); + messages.Add(failedTaskIndication); + ValidateMessageSuccessfullyLogged(messages, "driver", DriverStdout, testFolder, 1); + + var messages1 = new List<string>(); + messages1.Add(DisposeMessageFromDriver); + ValidateMessageSuccessfullyLogged(messages1, "Node-*", EvaluatorStdout, testFolder, 2); + CleanUp(testFolder); + } + + /// <summary> + /// This test is to close a running task over the bridge without close handler bound + /// Expect to get TaskCloseHandlerNotBoundException + /// </summary> + [Fact] + public void TestTaskWithNoCloseHandlerOnLocalRuntime() + { + const string closeHandlerNoBound = "ExceptionCaught TaskCloseHandlerNotBoundException"; + + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForNoCloseHandlerTask()), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + var messages = new List<string>(); + messages.Add(closeHandlerNoBound); ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); CleanUp(testFolder); } /// <summary> - /// This test is to close a running task over the bridge + /// This test is to close a running task over the bridge with null message /// </summary> [Fact] public void TestStopTaskOnLocalRuntimeWithNullMessage() { string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); - TestRun(DriverConfigurations(NoMessage), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + TestRun(DriverConfigurations(NoMessage, GetTaskConfigurationForCloseTask()), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); ValidateSuccessForLocalRuntime(1, testFolder: testFolder); ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1); var messages = new List<string>(); @@ -83,11 +128,37 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge CleanUp(testFolder); } + private IConfiguration GetTaskConfigurationForCloseTask() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseTestTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseTestTask>.Class) + .Build(); + } + + private IConfiguration GetTaskConfigurationForFailToCloseTask() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID-FailToClose") + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.FailToCloseTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.FailToCloseTask>.Class) + .Build(); + } + + private IConfiguration GetTaskConfigurationForNoCloseHandlerTask() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID-NoCloseHandler") + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.NoCloseHandlerTask>.Class) + .Build(); + } + /// <summary> /// Driver configuration for the test driver /// </summary> /// <returns></returns> - public IConfiguration DriverConfigurations(string taskCloseMessage) + public IConfiguration DriverConfigurations(string taskCloseMessage, IConfiguration taskConfig) { var handlerConfig = DriverConfiguration.ConfigurationModule .Set(DriverConfiguration.OnDriverStarted, GenericType<CloseTaskTestDriver>.Class) @@ -95,10 +166,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge .Set(DriverConfiguration.OnContextActive, GenericType<CloseTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<CloseTaskTestDriver>.Class) .Build(); + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); var messageConfig = TangFactory.GetTang().NewConfigurationBuilder() .BindStringNamedParam<DisposeMessage>(taskCloseMessage) + .BindStringNamedParam<TaskConfigurationString>(serializer.ToString(taskConfig)) .Build(); return Configurations.Merge(handlerConfig, messageConfig); @@ -109,24 +183,33 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { } + [NamedParameter("Task Configuration string", "TaskConfigurationString")] + private class TaskConfigurationString : Name<string> + { + } + private sealed class CloseTaskTestDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<ICompletedTask>, + IObserver<IFailedTask>, IObserver<IRunningTask> { private readonly IEvaluatorRequestor _requestor; private int _contextNumber = 0; - private int _taskNumber = 0; private string _disposeMessage; + private IConfiguration _taskConfiguration; [Inject] private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor, - [Parameter(typeof(DisposeMessage))] string disposeMessage) + [Parameter(typeof(DisposeMessage))] string disposeMessage, + [Parameter(typeof(TaskConfigurationString))] string taskConfigString, + AvroConfigurationSerializer avroConfigurationSerializer) { _requestor = evaluatorRequestor; _disposeMessage = disposeMessage; + _taskConfiguration = avroConfigurationSerializer.FromString(taskConfigString); } public void OnNext(IDriverStarted value) @@ -144,16 +227,33 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void OnNext(IActiveContext value) { - value.SubmitTask(GetTaskConfiguration()); + value.SubmitTask(_taskConfiguration); } public void OnNext(ICompletedTask value) { // Log on task completion to signal a passed test. - Logger.Log(Level.Info, CompletedValidationMessage + "Task completed: " + value.Id); + Logger.Log(Level.Info, CompletedValidationMessage + ". Task completed: " + value.Id); value.ActiveContext.Dispose(); } + public void OnNext(IFailedTask value) + { + var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value); + Logger.Log(Level.Error, "In IFailedTask: " + failedExeption); + + if (value.Id.EndsWith("TaskID-FailToClose")) + { + Assert.Contains(FailToCloseTaskMessage, failedExeption); + } + if (value.Id.EndsWith("TaskID-NoCloseHandler")) + { + Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, failedExeption); + } + + value.GetActiveContext().Value.Dispose(); + } + public void OnNext(IRunningTask value) { Logger.Log(Level.Info, "Task running: " + value.Id); @@ -167,12 +267,100 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } } - private IConfiguration GetTaskConfiguration() + public void OnCompleted() { - return TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "TaskID" + _taskNumber++) - .Set(TaskConfiguration.Task, GenericType<TestCloseTask.StopTestTask>.Class) - .Build(); + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + + private sealed class CloseTestTask : ITask, IObserver<ICloseEvent> + { + private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); + + [Inject] + private CloseTestTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello in StopTestTask"); + _suspendSignal.Wait(); + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Task is disposed."); + } + + public void OnNext(ICloseEvent value) + { + try + { + if (value.Value != null && value.Value.Value != null) + { + Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value)); + Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), DisposeMessageFromDriver); + } + } + finally + { + _suspendSignal.Signal(); + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + + private sealed class FailToCloseTask : ITask, IObserver<ICloseEvent> + { + private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); + + [Inject] + private FailToCloseTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello in FailtToCloseTask"); + _suspendSignal.Wait(); + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Task is disposed."); + } + + public void OnNext(ICloseEvent value) + { + try + { + if (value.Value != null && value.Value.Value != null) + { + Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value)); + Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), DisposeMessageFromDriver); + } + } + finally + { + throw new Exception(FailToCloseTaskMessage); + } } public void OnCompleted() @@ -186,17 +374,17 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } } - private sealed class StopTestTask : ITask + private sealed class NoCloseHandlerTask : ITask { [Inject] - private StopTestTask() + private NoCloseHandlerTask() { } public byte[] Call(byte[] memento) { - // TODO[REEF-1257] - Thread.Sleep(5 * 1000); + Logger.Log(Level.Info, "Hello in NoCloseHandlerTask"); + Thread.Sleep(50 * 1000); return null; }
