Repository: reef
Updated Branches:
  refs/heads/master 246ed1d60 -> 1e2ba9fdb


[REEF-1144] Properly support Context Stacks in .NET

This addressed the issue by
  * Allows the most basic context stacking.

JIRA:
  [REEF-1144](https://issues.apache.org/jira/browse/REEF-1144)

This closes #784


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1e2ba9fd
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1e2ba9fd
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1e2ba9fd

Branch: refs/heads/master
Commit: 1e2ba9fdb06cdf7301ce4fe1be506abfa73c9f9e
Parents: 246ed1d
Author: Andrew Chung <[email protected]>
Authored: Thu Jan 21 13:28:36 2016 -0800
Committer: Julia Wang <[email protected]>
Committed: Fri Feb 5 11:17:16 2016 -0800

----------------------------------------------------------------------
 .../ActiveContextClr2Java.cpp                   |  17 ++
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |   1 +
 .../Runtime/Evaluator/Context/ContextManager.cs |   2 +-
 .../Runtime/Evaluator/Context/ContextRuntime.cs |   3 +
 .../Bridge/Clr2java/IActiveContextClr2Java.cs   |   2 +
 .../Bridge/Events/ActiveContext.cs              |  13 +-
 .../Functional/Bridge/TestContextStack.cs       | 199 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 .../reef/javabridge/ActiveContextBridge.java    |  11 +-
 .../common/driver/context/EvaluatorContext.java |   7 +-
 10 files changed, 247 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
index 266829d..f5e6bcf 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
@@ -81,6 +81,23 @@ namespace Org {
                                                        
ManagedLog::LOGGER->LogStop("ActiveContextClr2Java::SubmitTask");
                                                }
 
+                                               void 
ActiveContextClr2Java::SubmitContext(String^ contextConfigStr) {
+                                                       
ManagedLog::LOGGER->LogStart("ActiveContextClr2Java::SubmitContext");
+                                                       JNIEnv *env = 
RetrieveEnv(_jvm);
+                                                       jclass 
jclassActiveContext = env->GetObjectClass(_jobjectActiveContext);
+                                                       jmethodID 
jmidSubmitContext = env->GetMethodID(jclassActiveContext, 
"submitContextString", "(Ljava/lang/String;)V");
+
+                                                       if (jmidSubmitContext 
== NULL) {
+                                                               
ManagedLog::LOGGER->Log("jmidSubmitContext is NULL");
+                                                               return;
+                                                       }
+                                                       env->CallObjectMethod(
+                                                               
_jobjectActiveContext,
+                                                               
jmidSubmitContext,
+                                                               
JavaStringFromManagedString(env, contextConfigStr));
+                                                       
ManagedLog::LOGGER->LogStop("ActiveContextClr2Java::SubmitContext");
+                                               }
+
                                                void 
ActiveContextClr2Java::OnError(String^ message) {
                                                        JNIEnv *env = 
RetrieveEnv(_jvm);
                                                        
HandleClr2JavaError(env, message, _jobjectActiveContext);

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 6aafa35..97dca5c 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -83,6 +83,7 @@ namespace Org {
                             ~ActiveContextClr2Java();
                             !ActiveContextClr2Java();
                             virtual void SubmitTask(String^ taskConfigStr);
+                            virtual void SubmitContext(String^ 
contextConfigStr);
                             virtual void Close();
                             virtual void OnError(String^ message);
                             virtual String^ GetId();

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
index 520dce9..143c81e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
@@ -287,7 +287,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 var contextConfiguration = 
_serializer.FromString(addContextProto.context_configuration);
 
                 ContextRuntime newTopContext;
-                if (addContextProto.service_configuration != null)
+                if 
(!string.IsNullOrWhiteSpace(addContextProto.service_configuration))
                 {
                     var serviceConfiguration = new 
ServiceConfiguration(addContextProto.service_configuration);
                     newTopContext = 
currentTopContext.SpawnChildContext(contextConfiguration, 
serviceConfiguration.TangConfig);

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
index 7a78904..21aac8c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
@@ -382,7 +382,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 if (_childContext.IsPresent())
                 {
                     _childContext = Optional<ContextRuntime>.Empty();
+                    return;
                 }
+
+                // To reset a child context, there should always be a child 
context already present.
                 Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException("no child context set"), LOGGER);
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
index 9d80335..8319585 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
@@ -24,6 +24,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
     public interface IActiveContextClr2Java : IClr2Java
     {
         void SubmitTask(string taskConfigStr);
+
+        void SubmitContext(string contextConfigStr);
         
         void Close();
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
index 1c3e9ec..b96b238 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
@@ -30,7 +30,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
     [DataContract]
     internal class ActiveContext : IActiveContext
     {
-        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ActiveContext));
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ActiveContext));
         private readonly AvroConfigurationSerializer _serializer;
 
         internal ActiveContext(IActiveContextClr2Java clr2Java)
@@ -68,21 +68,24 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
 
         public void SubmitTask(IConfiguration taskConfiguration)
         {
-            LOGGER.Log(Level.Info, "ActiveContext::SubmitTask");
+            Logger.Log(Level.Info, "ActiveContext::SubmitTask");
             var task = _serializer.ToString(taskConfiguration);
-            LOGGER.Log(Level.Verbose, "serialized taskConfiguration: " + task);
+            Logger.Log(Level.Verbose, "serialized taskConfiguration: " + task);
             Clr2Java.SubmitTask(task);
         }
 
         public void Dispose()
         {
-            LOGGER.Log(Level.Info, "ActiveContext::Dispose");
+            Logger.Log(Level.Info, "ActiveContext::Dispose");
             Clr2Java.Close();
         }
 
         public void SubmitContext(IConfiguration contextConfiguration)
         {
-            throw new NotImplementedException();
+            Logger.Log(Level.Verbose, "ActiveContext::SubmitContext");
+            var context = _serializer.ToString(contextConfiguration);
+            Logger.Log(Level.Verbose, "serialized contextConfiguration: " + 
contextConfiguration);
+            Clr2Java.SubmitContext(context);
         }
 
         public void SubmitContextAndService(IConfiguration 
contextConfiguration, IConfiguration serviceConfiguration)

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
new file mode 100644
index 0000000..50e3c91
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Examples.AllHandlers;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+    [Collection("FunctionalTests")]
+    public sealed class TestContextStack : ReefFunctionalTest
+    {
+        private const string ContextOneId = "Context1";
+        private const string ContextTwoId = "Context2";
+        private const string TaskValidationMessage = "TaskValidationMessage";
+        private const string ClosedContextValidationMessage = 
"ClosedContextValidationMessage";
+
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TestContextStack));
+
+        public TestContextStack()
+        {
+            Init();
+        }
+
+        /// <summary>
+        /// Does a simple test of whether a context can be submitted on top of 
another context.
+        /// </summary>
+        [Fact]
+        public void TestContextStackingOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            CleanUp(testFolder);
+            TestRun(DriverConfigurations(), typeof(ContextStackHandlers), 1, 
"testContextStack", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            ValidateMessageSuccessfullyLogged(TaskValidationMessage, 
testFolder);
+            ValidateMessageSuccessfullyLogged(ClosedContextValidationMessage, 
testFolder);
+            CleanUp(testFolder);
+        }
+
+        public IConfiguration DriverConfigurations()
+        {
+            var helloDriverConfiguration = 
DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<ContextStackHandlers>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<ContextStackHandlers>.Class)
+                .Set(DriverConfiguration.OnContextActive, 
GenericType<ContextStackHandlers>.Class)
+                .Set(DriverConfiguration.OnTaskMessage, 
GenericType<HelloTaskMessageHandler>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<ContextStackHandlers>.Class)
+                .Set(DriverConfiguration.OnContextClosed, 
GenericType<ContextStackHandlers>.Class)
+                .Build();
+
+            return 
TangFactory.GetTang().NewConfigurationBuilder(helloDriverConfiguration).Build();
+        }
+
+        private sealed class ContextStackHandlers : 
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IActiveContext>,
+            IObserver<ICompletedTask>,
+            IObserver<IClosedContext>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+            private IAllocatedEvaluator _evaluator;
+
+            [Inject]
+            private ContextStackHandlers(IEvaluatorRequestor 
evaluatorRequestor)
+            {
+                _requestor = evaluatorRequestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                
value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
+                    .Set(Common.Context.ContextConfiguration.Identifier, 
ContextOneId)
+                    .Build());
+                _evaluator = value;
+            }
+
+            public void OnNext(IActiveContext value)
+            {
+                Logger.Log(Level.Verbose, "ContextId: " + value.Id);
+                switch (value.Id)
+                {
+                    case ContextOneId:
+                        var contextConfig =
+                            
Common.Context.ContextConfiguration.ConfigurationModule.Set(
+                                
Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+                                .Build();
+                        var stackingContextConfig =
+                            TangFactory.GetTang()
+                                .NewConfigurationBuilder()
+                                
.BindImplementation(GenericType<IInjectableInterface>.Class,
+                                    GenericType<InjectableInterfaceImpl>.Class)
+                                    .Build();
+
+                        
value.SubmitContext(Configurations.Merge(stackingContextConfig, contextConfig));
+                        break;
+                    case ContextTwoId:
+                        value.SubmitTask(
+                            
TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, 
"contextStackTestTask")
+                                .Set(TaskConfiguration.Task, 
GenericType<TestContextStackTask>.Class)
+                                .Build());
+                        break;
+                    default:
+                        throw new Exception("Unexpected ContextId: " + 
value.Id);
+                }
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, TaskValidationMessage);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(IClosedContext value)
+            {
+                // TODO[JIRA REEF-762]: Inspect closing order of contexts.
+                Logger.Log(Level.Info, ClosedContextValidationMessage);
+
+                // TODO[JIRA REEF-762]: Remove disposal of Evaluator, since it 
should naturally be closed if no contexts.
+                _evaluator.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// A Task to ensure that an object configured in the second context 
configuration 
+        /// is properly injected.
+        /// </summary>
+        private sealed class TestContextStackTask : ITask
+        {
+            [Inject]
+            private TestContextStackTask(IInjectableInterface 
injectableInterface)
+            {
+                Assert.NotNull(injectableInterface);
+                Assert.True(injectableInterface is InjectableInterfaceImpl);
+            }
+
+            public void Dispose()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                return null;
+            }
+        }
+
+        private interface IInjectableInterface
+        {
+        }
+
+        private sealed class InjectableInterfaceImpl : IInjectableInterface
+        {
+            [Inject]
+            private InjectableInterfaceImpl()
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 759f1ba..95d6076 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -92,6 +92,7 @@ under the License.
   <ItemGroup>
     <Compile Include="Functional\Bridge\HelloSimpleEventHandlers.cs" />
     <Compile Include="Functional\Bridge\TestBridgeClient.cs" />
+    <Compile Include="Functional\Bridge\TestContextStack.cs" />
     <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
     <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" />
     <Compile Include="Functional\Bridge\TestSimpleContext.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
index 30a5667..84637f8 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.reef.annotations.audience.Interop;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.ActiveContext;
@@ -62,7 +63,7 @@ public final class ActiveContextBridge extends NativeBridge 
implements Identifia
   }
 
   public void submitTaskString(final String taskConfigurationString) {
-    if (taskConfigurationString.isEmpty()) {
+    if (StringUtils.isEmpty(taskConfigurationString)) {
       throw new RuntimeException("empty taskConfigurationString provided.");
     }
 
@@ -71,6 +72,14 @@ public final class ActiveContextBridge extends NativeBridge 
implements Identifia
     ((EvaluatorContext)jactiveContext).submitTask(taskConfigurationString);
   }
 
+  public void submitContextString(final String contextConfigurationString) {
+    if (StringUtils.isEmpty(contextConfigurationString)) {
+      throw new RuntimeException("empty contextConfigurationString provided.");
+    }
+
+    
((EvaluatorContext)jactiveContext).submitContext(contextConfigurationString);
+  }
+
   public String getEvaluatorDescriptorString() {
     final String descriptorString = 
Utilities.getEvaluatorDescriptorString(jactiveContext.getEvaluatorDescriptor());
     LOG.log(Level.FINE, "active context - serialized evaluator descriptor: " + 
descriptorString);

http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
index 8106713..dbca918 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -150,12 +150,15 @@ public final class EvaluatorContext implements 
ActiveContext {
 
   @Override
   public synchronized void submitContext(final Configuration 
contextConfiguration) {
+    submitContext(this.configurationSerializer.toString(contextConfiguration));
+  }
 
+  public synchronized void submitContext(final String contextConf) {
     if (this.isClosed) {
       throw new RuntimeException("Active context already closed");
     }
 
-    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for 
context id[{1}]",
+    LOG.log(Level.FINEST, "Submit context: RunningEvaluator id[{0}] for 
context id[{1}]",
         new Object[]{getEvaluatorId(), getId()});
 
     final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
@@ -163,7 +166,7 @@ public final class EvaluatorContext implements 
ActiveContext {
             .setAddContext(
                 EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
                     .setParentContextId(getId())
-                    
.setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
+                    .setContextConfiguration(contextConf)
                     .build())
             .build();
 

Reply via email to