Repository: reef Updated Branches: refs/heads/master 0318bcbed -> 5123659da
[REEF-1210] Implement SuspendTask on the Driver JIRA: [REEF-1210] https://issues.apache.org/jira/browse/REEF-1210 Pull Request: Closes #867 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5123659d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5123659d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5123659d Branch: refs/heads/master Commit: 5123659dad6c0f95cea4b4f3b6d6d6faf1914af8 Parents: 0318bcb Author: Andrew Chung <[email protected]> Authored: Mon Feb 29 15:05:50 2016 -0800 Committer: Julia Wang <[email protected]> Committed: Tue Mar 8 18:28:53 2016 -0800 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 + .../RunningTaskClr2Java.cpp | 18 +- .../Bridge/Clr2java/IRunningTaskClr2Java.cs | 2 + .../Bridge/Events/RunningTask.cs | 6 +- .../Functional/Bridge/TestSuspendTask.cs | 250 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../reef/javabridge/RunningTaskBridge.java | 8 + 7 files changed, 281 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 6cd897d..530d14a 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -141,6 +141,7 @@ namespace Org { virtual IActiveContextClr2Java^ GetActiveContext(); virtual String^ GetId(); virtual void Send(array<byte>^ message); + virtual void Suspend(array<byte>^ message); }; public ref class FailedEvaluatorClr2Java : public IFailedEvaluatorClr2Java { http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp index 05475fd..71d6451 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp @@ -81,7 +81,6 @@ namespace Org { jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask); jmethodID jmidSend = env->GetMethodID(jclassRunningTask, "send", "([B)V"); - if (jmidSend == NULL) { ManagedLog::LOGGER->Log("jmidSend is NULL"); return; @@ -93,6 +92,23 @@ namespace Org { ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Send"); } + void RunningTaskClr2Java::Suspend(array<byte>^ message) { + ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Suspend"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask); + jmethodID jmidSuspend = env->GetMethodID(jclassRunningTask, "suspend", "([B)V"); + + if (jmidSuspend == NULL) { + ManagedLog::LOGGER->Log("jmidSuspend is NULL"); + return; + } + env->CallObjectMethod( + _jobjectRunningTask, + jmidSuspend, + JavaByteArrayFromManagedByteArray(env, message)); + ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Suspend"); + } + void RunningTaskClr2Java::OnError(String^ message) { ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError"); JNIEnv *env = RetrieveEnv(_jvm); http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs index 3a1cb6f..fde12db 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs @@ -27,5 +27,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java string GetId(); void Send(byte[] message); + + void Suspend(byte[] message); } } http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs index cc969a6..e993b1c 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs @@ -60,22 +60,20 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void Suspend(byte[] message) { - throw new NotImplementedException(); + _runningTaskClr2Java.Suspend(message); } public void Suspend() { - throw new NotImplementedException(); + Suspend(null); } public void Dispose(byte[] message) { - throw new NotImplementedException(); } public void Dispose() { - throw new NotImplementedException(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs new file mode 100644 index 0000000..69884a6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + [Collection("FunctionalTests")] + public sealed class TestSuspendTask : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestContextStack)); + + private const string SuspendMessageFromDriver = "SuspendMessageFromDriver"; + private const string SuspendValidationMessage = "SuspendValidationMessage"; + private const string CompletedValidationMessage = "CompletedValidationmessage"; + + public TestSuspendTask() + { + Init(); + } + + /// <summary> + /// Does a simple test of invoking suspend task with a message from the Driver + /// and makes sure the target task receives the suspend message. + /// Uses a shared context between both Tasks to record whether the suspend + /// message has been received at the Task. + /// </summary> + [Fact] + public void TestSuspendTaskOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + CleanUp(testFolder); + TestRun(DriverConfigurations(), typeof(SuspendTaskHandlers), 1, "testSuspendTask", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder: testFolder); + ValidateMessageSuccessfullyLogged(SuspendValidationMessage, testFolder); + ValidateMessageSuccessfullyLogged(CompletedValidationMessage, testFolder); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurations() + { + var helloDriverConfiguration = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<SuspendTaskHandlers>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<SuspendTaskHandlers>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<SuspendTaskHandlers>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<SuspendTaskHandlers>.Class) + .Set(DriverConfiguration.OnTaskSuspended, GenericType<SuspendTaskHandlers>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<SuspendTaskHandlers>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(helloDriverConfiguration).Build(); + } + + private sealed class SuspendTaskHandlers : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IRunningTask>, + IObserver<ISuspendedTask> + { + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private SuspendTaskHandlers(IEvaluatorRequestor evaluatorRequestor) + { + _requestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IActiveContext value) + { + // Submit the Task on the first time receiving an active context. + value.SubmitTask(GetTaskConfiguration()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitContext( + ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "ContextID") + .Set(ContextConfiguration.OnContextStart, GenericType<ContextStart>.Class) + .Build()); + } + + public void OnNext(ICompletedTask value) + { + // Log on task completion to signal a passed test. + Logger.Log(Level.Warning, CompletedValidationMessage); + value.ActiveContext.Dispose(); + } + + public void OnNext(ISuspendedTask value) + { + // Submit a second Task once the first Task has been successfully suspended + // on the same context as the first task. + Logger.Log(Level.Warning, SuspendValidationMessage); + value.ActiveContext.SubmitTask(GetTaskConfiguration()); + } + + public void OnNext(IRunningTask value) + { + // Suspend the first instance of the Task. + value.Suspend(Encoding.UTF8.GetBytes(SuspendMessageFromDriver)); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private static IConfiguration GetTaskConfiguration() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<SuspendTestTask>.Class) + .Set(TaskConfiguration.OnSuspend, GenericType<SuspendTestTask>.Class) + .Build(); + } + + private sealed class ContextStart : IObserver<IContextStart> + { + private readonly TaskContext _taskContext; + + [Inject] + private ContextStart(TaskContext taskContext) + { + _taskContext = taskContext; + } + + public void OnNext(IContextStart value) + { + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TaskContext + { + [Inject] + private TaskContext() + { + TaskSuspended = false; + } + + public bool TaskSuspended { get; set; } + } + + /// <summary> + /// A Task to ensure that an object configured in the second context configuration + /// is properly injected. + /// </summary> + private sealed class SuspendTestTask : ITask, IObserver<ISuspendEvent> + { + private readonly TaskContext _taskContext; + private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); + + [Inject] + private SuspendTestTask(TaskContext taskContext) + { + _taskContext = taskContext; + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + if (!_taskContext.TaskSuspended) + { + _suspendSignal.Wait(); + _taskContext.TaskSuspended = true; + } + + return null; + } + + public void OnNext(ISuspendEvent value) + { + try + { + Assert.Equal(Encoding.UTF8.GetString(value.Message.Value), SuspendMessageFromDriver); + } + finally + { + _suspendSignal.Signal(); + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index fa867c1..68fba1b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -80,6 +80,7 @@ under the License. <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" /> <Compile Include="Functional\Bridge\TestSimpleContext.cs" /> <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" /> + <Compile Include="Functional\Bridge\TestSuspendTask.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java index cae86c4..d307be7 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java @@ -47,6 +47,14 @@ public final class RunningTaskBridge extends NativeBridge implements Identifiabl jrunningTask.send(message); } + public void suspend(final byte[] message) { + if (message != null) { + jrunningTask.suspend(message); + } else { + jrunningTask.suspend(); + } + } + public ActiveContextBridge getActiveContext() { return jactiveContext; }
