Repository: reef Updated Branches: refs/heads/master 061f66277 -> 4c9c36852
[REEF-1256] Implement bridge for IRunningTask.Dispose() * Implement bridge method for closing a task * Add test case JIRA: [REEF-1256](https://issues.apache.org/jira/browse/REEF-1256) Pull Request: This closes #888 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4c9c3685 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4c9c3685 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4c9c3685 Branch: refs/heads/master Commit: 4c9c368520fdec66f6c3c25359699f230cc9f23c Parents: 061f662 Author: Julia Wang <[email protected]> Authored: Tue Mar 15 10:54:11 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Mar 15 15:55:05 2016 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 + .../RunningTaskClr2Java.cpp | 26 +++ .../Bridge/Clr2java/IRunningTaskClr2Java.cs | 2 + .../Bridge/Events/RunningTask.cs | 2 + .../Functional/Bridge/TestCloseTask.cs | 209 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../reef/javabridge/RunningTaskBridge.java | 8 + 7 files changed, 249 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 3a2c60c..381df15 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -142,6 +142,7 @@ namespace Org { virtual String^ GetId(); virtual void Send(array<byte>^ message); virtual void Suspend(array<byte>^ message); + virtual void Close(array<byte>^ message); }; public ref class FailedEvaluatorClr2Java : public IFailedEvaluatorClr2Java { http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp index 71d6451..d9e264d 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp @@ -109,6 +109,32 @@ namespace Org { ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Suspend"); } + void RunningTaskClr2Java::Close(array<byte>^ message) { + ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Close"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask); + jmethodID jmidClose = env->GetMethodID(jclassRunningTask, "close", "([B)V"); + + if (jmidClose == NULL) { + ManagedLog::LOGGER->Log("jmidClose is NULL"); + return; + } + + if (message == nullptr) { + env->CallObjectMethod( + _jobjectRunningTask, + jmidClose, + NULL); + } + else { + env->CallObjectMethod( + _jobjectRunningTask, + jmidClose, + JavaByteArrayFromManagedByteArray(env, message)); + } + ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Close"); + } + void RunningTaskClr2Java::OnError(String^ message) { ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError"); JNIEnv *env = RetrieveEnv(_jvm); http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs index fde12db..656b57a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs @@ -29,5 +29,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java void Send(byte[] message); void Suspend(byte[] message); + + void Close(byte[] message); } } http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs index e993b1c..dbf2d58 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs @@ -70,10 +70,12 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void Dispose(byte[] message) { + _runningTaskClr2Java.Close(message); } public void Dispose() { + Dispose(null); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs new file mode 100644 index 0000000..9cc3cd3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + /// <summary> + /// This test is to close a running task from driver + /// </summary> + public sealed class TestCloseTask : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestCloseTask)); + + private const string DisposeMessageFromDriver = "DisposeMessageFromDriver"; + private const string NoMessage = "NO_MESSAGE"; + private const string CompletedValidationMessage = "CompletedValidationmessage"; + + public TestCloseTask() + { + Init(); + } + + /// <summary> + /// This test is to close a running task over the bridge + /// </summary> + [Fact] + public void TestStopTaskOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(DisposeMessageFromDriver), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder: testFolder); + ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1); + var messages = new List<string>(); + messages.Add(DisposeMessageFromDriver); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// This test is to close a running task over the bridge + /// </summary> + [Fact] + public void TestStopTaskOnLocalRuntimeWithNullMessage() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(NoMessage), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder: testFolder); + ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1); + var messages = new List<string>(); + messages.Add("Control protobuf to stop task"); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// Driver configuration for the test driver + /// </summary> + /// <returns></returns> + public IConfiguration DriverConfigurations(string taskCloseMessage) + { + var handlerConfig = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class) + .Build(); + + var messageConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<DisposeMessage>(taskCloseMessage) + .Build(); + + return Configurations.Merge(handlerConfig, messageConfig); + } + + [NamedParameter("Message send with task close", "TaskDisposeMessage", NoMessage)] + private class DisposeMessage : Name<string> + { + } + + private sealed class CloseTaskTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IRunningTask> + { + private readonly IEvaluatorRequestor _requestor; + private int _contextNumber = 0; + private int _taskNumber = 0; + private string _disposeMessage; + + [Inject] + private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor, + [Parameter(typeof(DisposeMessage))] string disposeMessage) + { + _requestor = evaluatorRequestor; + _disposeMessage = disposeMessage; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().SetNumber(1).Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitContext( + ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "ContextID" + _contextNumber++) + .Build()); + } + + public void OnNext(IActiveContext value) + { + value.SubmitTask(GetTaskConfiguration()); + } + + public void OnNext(ICompletedTask value) + { + // Log on task completion to signal a passed test. + Logger.Log(Level.Info, CompletedValidationMessage + "Task completed: " + value.Id); + value.ActiveContext.Dispose(); + } + + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, "Task running: " + value.Id); + if (_disposeMessage.Equals(NoMessage)) + { + value.Dispose(); + } + else + { + value.Dispose(Encoding.UTF8.GetBytes(_disposeMessage)); + } + } + + private IConfiguration GetTaskConfiguration() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID" + _taskNumber++) + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.StopTestTask>.Class) + .Build(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + + private sealed class StopTestTask : ITask + { + [Inject] + private StopTestTask() + { + } + + public byte[] Call(byte[] memento) + { + // TODO[REEF-1257] + Thread.Sleep(5 * 1000); + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Task is disposed."); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 921d862..0bc15e6 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -75,6 +75,7 @@ under the License. <ItemGroup> <Compile Include="Functional\Bridge\HelloSimpleEventHandlers.cs" /> <Compile Include="Functional\Bridge\TestBridgeClient.cs" /> + <Compile Include="Functional\Bridge\TestCloseTask.cs" /> <Compile Include="Functional\Bridge\TestContextStack.cs" /> <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" /> <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java index d307be7..e46b26a 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java @@ -55,6 +55,14 @@ public final class RunningTaskBridge extends NativeBridge implements Identifiabl } } + public void close(final byte[] message) { + if (message != null) { + jrunningTask.close(message); + } else { + jrunningTask.close(); + } + } + public ActiveContextBridge getActiveContext() { return jactiveContext; }
