Repository: reef
Updated Branches:
  refs/heads/master 0318bcbed -> 5123659da


[REEF-1210] Implement SuspendTask on the Driver

JIRA:
  [REEF-1210] https://issues.apache.org/jira/browse/REEF-1210

Pull Request:
  Closes #867


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5123659d
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5123659d
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5123659d

Branch: refs/heads/master
Commit: 5123659dad6c0f95cea4b4f3b6d6d6faf1914af8
Parents: 0318bcb
Author: Andrew Chung <[email protected]>
Authored: Mon Feb 29 15:05:50 2016 -0800
Committer: Julia Wang <[email protected]>
Committed: Tue Mar 8 18:28:53 2016 -0800

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |   1 +
 .../RunningTaskClr2Java.cpp                     |  18 +-
 .../Bridge/Clr2java/IRunningTaskClr2Java.cs     |   2 +
 .../Bridge/Events/RunningTask.cs                |   6 +-
 .../Functional/Bridge/TestSuspendTask.cs        | 250 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 .../reef/javabridge/RunningTaskBridge.java      |   8 +
 7 files changed, 281 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 6cd897d..530d14a 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -141,6 +141,7 @@ namespace Org {
                             virtual IActiveContextClr2Java^ GetActiveContext();
                             virtual String^ GetId();
                             virtual void Send(array<byte>^ message);
+                            virtual void Suspend(array<byte>^ message);
                         };
 
                         public ref class FailedEvaluatorClr2Java : public 
IFailedEvaluatorClr2Java {

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
index 05475fd..71d6451 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
@@ -81,7 +81,6 @@ namespace Org {
                                                  jclass jclassRunningTask = 
env->GetObjectClass(_jobjectRunningTask);
                                                  jmethodID jmidSend = 
env->GetMethodID(jclassRunningTask, "send", "([B)V");
 
-
                                                  if (jmidSend == NULL) {
                                                          
ManagedLog::LOGGER->Log("jmidSend is NULL");
                                                          return;
@@ -93,6 +92,23 @@ namespace Org {
                                                  
ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Send");
                                          }
 
+                                         void 
RunningTaskClr2Java::Suspend(array<byte>^ message) {
+                                                 
ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Suspend");
+                                                 JNIEnv *env = 
RetrieveEnv(_jvm);
+                                                 jclass jclassRunningTask = 
env->GetObjectClass(_jobjectRunningTask);
+                                                 jmethodID jmidSuspend = 
env->GetMethodID(jclassRunningTask, "suspend", "([B)V");
+
+                                                 if (jmidSuspend == NULL) {
+                                                         
ManagedLog::LOGGER->Log("jmidSuspend is NULL");
+                                                         return;
+                                                 }
+                                                 env->CallObjectMethod(
+                                                         _jobjectRunningTask,
+                                                         jmidSuspend,
+                                                         
JavaByteArrayFromManagedByteArray(env, message));
+                                                 
ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Suspend");
+                                         }
+
                                          void 
RunningTaskClr2Java::OnError(String^ message) {
                                                  
ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError");
                                                  JNIEnv *env = 
RetrieveEnv(_jvm);

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
index 3a1cb6f..fde12db 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
@@ -27,5 +27,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
         string GetId();
 
         void Send(byte[] message);
+
+        void Suspend(byte[] message);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
index cc969a6..e993b1c 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
@@ -60,22 +60,20 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
 
         public void Suspend(byte[] message)
         {
-            throw new NotImplementedException();
+            _runningTaskClr2Java.Suspend(message);
         }
 
         public void Suspend()
         {
-            throw new NotImplementedException();
+            Suspend(null);
         }
 
         public void Dispose(byte[] message)
         {
-            throw new NotImplementedException();
         }
 
         public void Dispose()
         {
-            throw new NotImplementedException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs
new file mode 100644
index 0000000..69884a6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSuspendTask.cs
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Text;
+using System.Threading;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+    [Collection("FunctionalTests")]
+    public sealed class TestSuspendTask : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TestContextStack));
+
+        private const string SuspendMessageFromDriver = 
"SuspendMessageFromDriver";
+        private const string SuspendValidationMessage = 
"SuspendValidationMessage";
+        private const string CompletedValidationMessage = 
"CompletedValidationmessage";
+
+        public TestSuspendTask()
+        {
+            Init();
+        }
+
+        /// <summary>
+        /// Does a simple test of invoking suspend task with a message from 
the Driver
+        /// and makes sure the target task receives the suspend message.
+        /// Uses a shared context between both Tasks to record whether the 
suspend
+        /// message has been received at the Task.
+        /// </summary>
+        [Fact]
+        public void TestSuspendTaskOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            CleanUp(testFolder);
+            TestRun(DriverConfigurations(), typeof(SuspendTaskHandlers), 1, 
"testSuspendTask", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            ValidateMessageSuccessfullyLogged(SuspendValidationMessage, 
testFolder);
+            ValidateMessageSuccessfullyLogged(CompletedValidationMessage, 
testFolder);
+            CleanUp(testFolder);
+        }
+
+        public IConfiguration DriverConfigurations()
+        {
+            var helloDriverConfiguration = 
DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<SuspendTaskHandlers>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<SuspendTaskHandlers>.Class)
+                .Set(DriverConfiguration.OnContextActive, 
GenericType<SuspendTaskHandlers>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, 
GenericType<SuspendTaskHandlers>.Class)
+                .Set(DriverConfiguration.OnTaskSuspended, 
GenericType<SuspendTaskHandlers>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<SuspendTaskHandlers>.Class)
+                .Build();
+
+            return 
TangFactory.GetTang().NewConfigurationBuilder(helloDriverConfiguration).Build();
+        }
+
+        private sealed class SuspendTaskHandlers :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IActiveContext>,
+            IObserver<ICompletedTask>,
+            IObserver<IRunningTask>,
+            IObserver<ISuspendedTask>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private SuspendTaskHandlers(IEvaluatorRequestor evaluatorRequestor)
+            {
+                _requestor = evaluatorRequestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IActiveContext value)
+            {
+                // Submit the Task on the first time receiving an active 
context.
+                value.SubmitTask(GetTaskConfiguration());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitContext(
+                    ContextConfiguration.ConfigurationModule
+                        .Set(ContextConfiguration.Identifier, "ContextID")
+                        .Set(ContextConfiguration.OnContextStart, 
GenericType<ContextStart>.Class)
+                        .Build());
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                // Log on task completion to signal a passed test.
+                Logger.Log(Level.Warning, CompletedValidationMessage);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(ISuspendedTask value)
+            {
+                // Submit a second Task once the first Task has been 
successfully suspended
+                // on the same context as the first task.
+                Logger.Log(Level.Warning, SuspendValidationMessage);
+                value.ActiveContext.SubmitTask(GetTaskConfiguration());
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                // Suspend the first instance of the Task.
+                
value.Suspend(Encoding.UTF8.GetBytes(SuspendMessageFromDriver));
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private static IConfiguration GetTaskConfiguration()
+        {
+            return TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID")
+                .Set(TaskConfiguration.Task, 
GenericType<SuspendTestTask>.Class)
+                .Set(TaskConfiguration.OnSuspend, 
GenericType<SuspendTestTask>.Class)
+                .Build();
+        }
+
+        private sealed class ContextStart : IObserver<IContextStart>
+        {
+            private readonly TaskContext _taskContext;
+
+            [Inject]
+            private ContextStart(TaskContext taskContext)
+            {
+                _taskContext = taskContext;
+            }
+
+            public void OnNext(IContextStart value)
+            {
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class TaskContext
+        {
+            [Inject]
+            private TaskContext()
+            {
+                TaskSuspended = false;
+            }
+
+            public bool TaskSuspended { get; set; }
+        }
+
+        /// <summary>
+        /// A Task to ensure that an object configured in the second context 
configuration 
+        /// is properly injected.
+        /// </summary>
+        private sealed class SuspendTestTask : ITask, IObserver<ISuspendEvent>
+        {
+            private readonly TaskContext _taskContext;
+            private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
+
+            [Inject]
+            private SuspendTestTask(TaskContext taskContext)
+            {
+                _taskContext = taskContext;
+            }
+
+            public void Dispose()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                if (!_taskContext.TaskSuspended)
+                {
+                    _suspendSignal.Wait();
+                    _taskContext.TaskSuspended = true;
+                }
+
+                return null;
+            }
+
+            public void OnNext(ISuspendEvent value)
+            {
+                try
+                {
+                    Assert.Equal(Encoding.UTF8.GetString(value.Message.Value), 
SuspendMessageFromDriver);
+                }
+                finally
+                {
+                    _suspendSignal.Signal();
+                }
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index fa867c1..68fba1b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -80,6 +80,7 @@ under the License.
     <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" />
     <Compile Include="Functional\Bridge\TestSimpleContext.cs" />
     <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" />
+    <Compile Include="Functional\Bridge\TestSuspendTask.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
     <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" />
     <Compile Include="Functional\Driver\TestDriver.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/5123659d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
index cae86c4..d307be7 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -47,6 +47,14 @@ public final class RunningTaskBridge extends NativeBridge 
implements Identifiabl
     jrunningTask.send(message);
   }
 
+  public void suspend(final byte[] message) {
+    if (message != null) {
+      jrunningTask.suspend(message);
+    } else {
+      jrunningTask.suspend();
+    }
+  }
+
   public ActiveContextBridge getActiveContext() {
     return jactiveContext;
   }

Reply via email to