Repository: reef Updated Branches: refs/heads/master 246ed1d60 -> 1e2ba9fdb
[REEF-1144] Properly support Context Stacks in .NET This addressed the issue by * Allows the most basic context stacking. JIRA: [REEF-1144](https://issues.apache.org/jira/browse/REEF-1144) This closes #784 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1e2ba9fd Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1e2ba9fd Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1e2ba9fd Branch: refs/heads/master Commit: 1e2ba9fdb06cdf7301ce4fe1be506abfa73c9f9e Parents: 246ed1d Author: Andrew Chung <[email protected]> Authored: Thu Jan 21 13:28:36 2016 -0800 Committer: Julia Wang <[email protected]> Committed: Fri Feb 5 11:17:16 2016 -0800 ---------------------------------------------------------------------- .../ActiveContextClr2Java.cpp | 17 ++ lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 + .../Runtime/Evaluator/Context/ContextManager.cs | 2 +- .../Runtime/Evaluator/Context/ContextRuntime.cs | 3 + .../Bridge/Clr2java/IActiveContextClr2Java.cs | 2 + .../Bridge/Events/ActiveContext.cs | 13 +- .../Functional/Bridge/TestContextStack.cs | 199 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../reef/javabridge/ActiveContextBridge.java | 11 +- .../common/driver/context/EvaluatorContext.java | 7 +- 10 files changed, 247 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp index 266829d..f5e6bcf 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp @@ -81,6 +81,23 @@ namespace Org { ManagedLog::LOGGER->LogStop("ActiveContextClr2Java::SubmitTask"); } + void ActiveContextClr2Java::SubmitContext(String^ contextConfigStr) { + ManagedLog::LOGGER->LogStart("ActiveContextClr2Java::SubmitContext"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassActiveContext = env->GetObjectClass(_jobjectActiveContext); + jmethodID jmidSubmitContext = env->GetMethodID(jclassActiveContext, "submitContextString", "(Ljava/lang/String;)V"); + + if (jmidSubmitContext == NULL) { + ManagedLog::LOGGER->Log("jmidSubmitContext is NULL"); + return; + } + env->CallObjectMethod( + _jobjectActiveContext, + jmidSubmitContext, + JavaStringFromManagedString(env, contextConfigStr)); + ManagedLog::LOGGER->LogStop("ActiveContextClr2Java::SubmitContext"); + } + void ActiveContextClr2Java::OnError(String^ message) { JNIEnv *env = RetrieveEnv(_jvm); HandleClr2JavaError(env, message, _jobjectActiveContext); http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 6aafa35..97dca5c 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -83,6 +83,7 @@ namespace Org { ~ActiveContextClr2Java(); !ActiveContextClr2Java(); virtual void SubmitTask(String^ taskConfigStr); + virtual void SubmitContext(String^ contextConfigStr); virtual void Close(); virtual void OnError(String^ message); virtual String^ GetId(); http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 520dce9..143c81e 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -287,7 +287,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context var contextConfiguration = _serializer.FromString(addContextProto.context_configuration); ContextRuntime newTopContext; - if (addContextProto.service_configuration != null) + if (!string.IsNullOrWhiteSpace(addContextProto.service_configuration)) { var serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 7a78904..21aac8c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -382,7 +382,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context if (_childContext.IsPresent()) { _childContext = Optional<ContextRuntime>.Empty(); + return; } + + // To reset a child context, there should always be a child context already present. Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("no child context set"), LOGGER); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs index 9d80335..8319585 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs @@ -24,6 +24,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java public interface IActiveContextClr2Java : IClr2Java { void SubmitTask(string taskConfigStr); + + void SubmitContext(string contextConfigStr); void Close(); http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs index 1c3e9ec..b96b238 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs @@ -30,7 +30,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Events [DataContract] internal class ActiveContext : IActiveContext { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ActiveContext)); + private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContext)); private readonly AvroConfigurationSerializer _serializer; internal ActiveContext(IActiveContextClr2Java clr2Java) @@ -68,21 +68,24 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void SubmitTask(IConfiguration taskConfiguration) { - LOGGER.Log(Level.Info, "ActiveContext::SubmitTask"); + Logger.Log(Level.Info, "ActiveContext::SubmitTask"); var task = _serializer.ToString(taskConfiguration); - LOGGER.Log(Level.Verbose, "serialized taskConfiguration: " + task); + Logger.Log(Level.Verbose, "serialized taskConfiguration: " + task); Clr2Java.SubmitTask(task); } public void Dispose() { - LOGGER.Log(Level.Info, "ActiveContext::Dispose"); + Logger.Log(Level.Info, "ActiveContext::Dispose"); Clr2Java.Close(); } public void SubmitContext(IConfiguration contextConfiguration) { - throw new NotImplementedException(); + Logger.Log(Level.Verbose, "ActiveContext::SubmitContext"); + var context = _serializer.ToString(contextConfiguration); + Logger.Log(Level.Verbose, "serialized contextConfiguration: " + contextConfiguration); + Clr2Java.SubmitContext(context); } public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs new file mode 100644 index 0000000..50e3c91 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Examples.AllHandlers; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + [Collection("FunctionalTests")] + public sealed class TestContextStack : ReefFunctionalTest + { + private const string ContextOneId = "Context1"; + private const string ContextTwoId = "Context2"; + private const string TaskValidationMessage = "TaskValidationMessage"; + private const string ClosedContextValidationMessage = "ClosedContextValidationMessage"; + + private static readonly Logger Logger = Logger.GetLogger(typeof(TestContextStack)); + + public TestContextStack() + { + Init(); + } + + /// <summary> + /// Does a simple test of whether a context can be submitted on top of another context. + /// </summary> + [Fact] + public void TestContextStackingOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + CleanUp(testFolder); + TestRun(DriverConfigurations(), typeof(ContextStackHandlers), 1, "testContextStack", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder: testFolder); + ValidateMessageSuccessfullyLogged(TaskValidationMessage, testFolder); + ValidateMessageSuccessfullyLogged(ClosedContextValidationMessage, testFolder); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurations() + { + var helloDriverConfiguration = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStackHandlers>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStackHandlers>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<ContextStackHandlers>.Class) + .Set(DriverConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStackHandlers>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<ContextStackHandlers>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(helloDriverConfiguration).Build(); + } + + private sealed class ContextStackHandlers : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IClosedContext> + { + private readonly IEvaluatorRequestor _requestor; + private IAllocatedEvaluator _evaluator; + + [Inject] + private ContextStackHandlers(IEvaluatorRequestor evaluatorRequestor) + { + _requestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule + .Set(Common.Context.ContextConfiguration.Identifier, ContextOneId) + .Build()); + _evaluator = value; + } + + public void OnNext(IActiveContext value) + { + Logger.Log(Level.Verbose, "ContextId: " + value.Id); + switch (value.Id) + { + case ContextOneId: + var contextConfig = + Common.Context.ContextConfiguration.ConfigurationModule.Set( + Common.Context.ContextConfiguration.Identifier, ContextTwoId) + .Build(); + var stackingContextConfig = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IInjectableInterface>.Class, + GenericType<InjectableInterfaceImpl>.Class) + .Build(); + + value.SubmitContext(Configurations.Merge(stackingContextConfig, contextConfig)); + break; + case ContextTwoId: + value.SubmitTask( + TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextStackTestTask") + .Set(TaskConfiguration.Task, GenericType<TestContextStackTask>.Class) + .Build()); + break; + default: + throw new Exception("Unexpected ContextId: " + value.Id); + } + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, TaskValidationMessage); + value.ActiveContext.Dispose(); + } + + public void OnNext(IClosedContext value) + { + // TODO[JIRA REEF-762]: Inspect closing order of contexts. + Logger.Log(Level.Info, ClosedContextValidationMessage); + + // TODO[JIRA REEF-762]: Remove disposal of Evaluator, since it should naturally be closed if no contexts. + _evaluator.Dispose(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// A Task to ensure that an object configured in the second context configuration + /// is properly injected. + /// </summary> + private sealed class TestContextStackTask : ITask + { + [Inject] + private TestContextStackTask(IInjectableInterface injectableInterface) + { + Assert.NotNull(injectableInterface); + Assert.True(injectableInterface is InjectableInterfaceImpl); + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + return null; + } + } + + private interface IInjectableInterface + { + } + + private sealed class InjectableInterfaceImpl : IInjectableInterface + { + [Inject] + private InjectableInterfaceImpl() + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 759f1ba..95d6076 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -92,6 +92,7 @@ under the License. <ItemGroup> <Compile Include="Functional\Bridge\HelloSimpleEventHandlers.cs" /> <Compile Include="Functional\Bridge\TestBridgeClient.cs" /> + <Compile Include="Functional\Bridge\TestContextStack.cs" /> <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" /> <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" /> <Compile Include="Functional\Bridge\TestSimpleContext.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java index 30a5667..84637f8 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java @@ -18,6 +18,7 @@ */ package org.apache.reef.javabridge; +import org.apache.commons.lang3.StringUtils; import org.apache.reef.annotations.audience.Interop; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.ActiveContext; @@ -62,7 +63,7 @@ public final class ActiveContextBridge extends NativeBridge implements Identifia } public void submitTaskString(final String taskConfigurationString) { - if (taskConfigurationString.isEmpty()) { + if (StringUtils.isEmpty(taskConfigurationString)) { throw new RuntimeException("empty taskConfigurationString provided."); } @@ -71,6 +72,14 @@ public final class ActiveContextBridge extends NativeBridge implements Identifia ((EvaluatorContext)jactiveContext).submitTask(taskConfigurationString); } + public void submitContextString(final String contextConfigurationString) { + if (StringUtils.isEmpty(contextConfigurationString)) { + throw new RuntimeException("empty contextConfigurationString provided."); + } + + ((EvaluatorContext)jactiveContext).submitContext(contextConfigurationString); + } + public String getEvaluatorDescriptorString() { final String descriptorString = Utilities.getEvaluatorDescriptorString(jactiveContext.getEvaluatorDescriptor()); LOG.log(Level.FINE, "active context - serialized evaluator descriptor: " + descriptorString); http://git-wip-us.apache.org/repos/asf/reef/blob/1e2ba9fd/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java index 8106713..dbca918 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java @@ -150,12 +150,15 @@ public final class EvaluatorContext implements ActiveContext { @Override public synchronized void submitContext(final Configuration contextConfiguration) { + submitContext(this.configurationSerializer.toString(contextConfiguration)); + } + public synchronized void submitContext(final String contextConf) { if (this.isClosed) { throw new RuntimeException("Active context already closed"); } - LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]", + LOG.log(Level.FINEST, "Submit context: RunningEvaluator id[{0}] for context id[{1}]", new Object[]{getEvaluatorId(), getId()}); final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = @@ -163,7 +166,7 @@ public final class EvaluatorContext implements ActiveContext { .setAddContext( EvaluatorRuntimeProtocol.AddContextProto.newBuilder() .setParentContextId(getId()) - .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration)) + .setContextConfiguration(contextConf) .build()) .build();
