Repository: incubator-reef Updated Branches: refs/heads/master 8c4bb098c -> 4b2190b13
[REEF-168]:Add ConfigurationProvider support for CLR This addressed the issue by making IEvaluatorRequestor injectable JIRA: [REEF-168](https://issues.apache.org/jira/browse/REEF-168) This closes #197 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4b2190b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4b2190b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4b2190b1 Branch: refs/heads/master Commit: 4b2190b1348ef92184ef59e55d1bbb500835dd87 Parents: 8c4bb09 Author: Beysim Sezgin <[email protected]> Authored: Fri May 29 11:20:15 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Fri May 29 15:37:36 2015 -0700 ---------------------------------------------------------------------- .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 7 +++--- .../Bridge/BridgeConfigurationProvider.cs | 11 ++++++++-- .../Bridge/ClrHandlerHelper.cs | 3 ++- .../Bridge/ClrSystemHandlerWrapper.cs | 23 ++++++++++++++------ .../Bridge/DriverBridge.cs | 23 ++++++++++---------- .../Bridge/DriverBridgeConfiguration.cs | 1 + .../HelloDriver.cs | 14 +++--------- .../HelloREEF.cs | 1 - .../apache/reef/javabridge/NativeInterop.java | 13 ++++------- .../reef/javabridge/generic/JobDriver.java | 14 ++---------- 10 files changed, 52 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index 2b8c7da..553c611 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -97,10 +97,10 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_loadClrAsse /* * Class: org_apache_reef_javabridge_NativeInterop * Method: CallClrSystemOnStartHandler - * Signature: (Ljava/lang/String;)V + * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J */ JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_CallClrSystemOnStartHandler -(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort) { +(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jevaluatorRequestorBridge) { try { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_CallClrSystemOnStartHandler"); const wchar_t* charConfig = UnicodeCppStringFromJavaString (env, dateTimeString); @@ -112,7 +112,8 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_CallC int lenPort = env->GetStringLength(httpServerPort); String^ strPort = Marshal::PtrToStringUni((IntPtr)(unsigned short*) charPort, lenPort); - array<unsigned long long>^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort); + EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); + array<unsigned long long>^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge); return JavaLongArrayFromManagedLongArray(env, handlers); } catch (System::Exception^ ex) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeConfigurationProvider.cs index 4f15b7c..a0b3a68 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeConfigurationProvider.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeConfigurationProvider.cs @@ -18,10 +18,12 @@ */ using System.IO; using Org.Apache.REEF.Common.Files; +using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; @@ -125,9 +127,14 @@ namespace Org.Apache.REEF.Driver.Bridge /// Instantiates an IInjector using the bridge configuration. /// </summary> /// <returns></returns> - internal static IInjector GetBridgeInjector() + internal static IInjector GetBridgeInjector(IEvaluatorRequestor evaluatorRequestor) { - return TangFactory.GetTang().NewInjector(GetBridgeConfiguration()); + var injector = TangFactory.GetTang().NewInjector(GetBridgeConfiguration()); + if (evaluatorRequestor != null) + { + injector.BindVolatileInstance(GenericType<IEvaluatorRequestor>.Class, evaluatorRequestor); + } + return injector; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs index 6ef87bb..7524d97 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs @@ -23,6 +23,7 @@ using System.Globalization; using System.IO; using System.Linq; using System.Runtime.InteropServices; +using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -82,7 +83,7 @@ namespace Org.Apache.REEF.Driver.Bridge CommandLineArguments arguments; try { - arguments = BridgeConfigurationProvider.GetBridgeInjector().GetInstance<CommandLineArguments>(); + arguments = BridgeConfigurationProvider.GetBridgeInjector(null).GetInstance<CommandLineArguments>(); } catch (InjectionException e) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs index 6c92ec7..66b44b7 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -229,28 +229,35 @@ namespace Org.Apache.REEF.Driver.Bridge } //Deprecate, remove after both Java and C# code gets checked in - public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime) + public static ulong[] Call_ClrSystemStartHandler_OnStart( + DateTime startTime, + IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) { + IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) { LOGGER.Log(Level.Info, "*** Start time is " + startTime); - return GetHandlers(null); + return GetHandlers(null, evaluatorRequestor); } } - public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime, string httpServerPort) + public static ulong[] Call_ClrSystemStartHandler_OnStart( + DateTime startTime, + string httpServerPort, + IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) { + IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) { LOGGER.Log(Level.Info, "*** Start time is " + startTime); LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); - return GetHandlers(httpServerPort); + return GetHandlers(httpServerPort, evaluatorRequestor); } } - private static ulong[] GetHandlers(string httpServerPortNumber) + private static ulong[] GetHandlers(string httpServerPortNumber, IEvaluatorRequestor evaluatorRequestor) { - var injector = BridgeConfigurationProvider.GetBridgeInjector(); + var injector = BridgeConfigurationProvider.GetBridgeInjector(evaluatorRequestor); try { @@ -268,7 +275,9 @@ namespace Org.Apache.REEF.Driver.Bridge Exceptions.CaughtAndThrow(e, Level.Error, "Cannot get instance.", LOGGER); } - return _driverBridge.Subscribe(); + var handles = _driverBridge.Subscribe(); + _driverBridge.ObsoleteEvaluatorRequestorOnNext(evaluatorRequestor); + return handles; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs index e989199..c8c4d59 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs @@ -40,8 +40,6 @@ namespace Org.Apache.REEF.Driver.Bridge private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorSubscriber; - private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorSubscriber; - private static ClrSystemHandler<ITaskMessage> _taskMessageSubscriber; private static ClrSystemHandler<IActiveContext> _activeContextSubscriber; @@ -166,8 +164,7 @@ namespace Org.Apache.REEF.Driver.Bridge _driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers; _httpServerHandler = httpServerHandler; _configurationProviders = configurationProviders; - - _evaluatorRequestorSubscriber = new ClrSystemHandler<IEvaluatorRequestor>(); + _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>(); _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>(); _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>(); @@ -195,14 +192,6 @@ namespace Org.Apache.REEF.Driver.Bridge _logger.Log(Level.Info, "subscribed to Driver restart handler: " + _driverRestartHandler); handlers[Constants.Handlers[Constants.DriverRestartHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartSubscriber); - // subscribe to Evaluator Requestor - foreach (var handler in _evaluatorRequestHandlers) - { - _evaluatorRequestorSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IEvaluatorRequestor handler: " + handler); - } - handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorSubscriber); - // subscribe to Allocated Evaluator foreach (var handler in _allocatedEvaluatorHandlers) { @@ -323,6 +312,16 @@ namespace Org.Apache.REEF.Driver.Bridge return handlers; } + [Obsolete(@"Obsoleted at versioin 0.12 and will be removed at version 0.13. See https://issues.apache.org/jira/browse/REEF-168")] + internal void ObsoleteEvaluatorRequestorOnNext(IEvaluatorRequestor evaluatorRequestor) + { + foreach (var handler in _evaluatorRequestHandlers) + { + handler.OnNext(evaluatorRequestor); + _logger.Log(Level.Info, "called IEvaluatorRequestor handler: " + handler); + } + } + internal ISet<IConfigurationProvider> ConfigurationProviders { get { return _configurationProviders; } } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs index d1f53ae..249466a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs @@ -54,6 +54,7 @@ namespace Org.Apache.REEF.Driver.Bridge /// <summary> /// The event handler for requesting evaluator /// </summary> + [Obsolete(@"Obsoleted at versioin 0.12 and will be removed at version 0.13. See https://issues.apache.org/jira/browse/REEF-168")] [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] public static readonly OptionalImpl<IObserver<IEvaluatorRequestor>> OnEvaluatorRequested = new OptionalImpl<IObserver<IEvaluatorRequestor>>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriver.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriver.cs index 8da1cad..3bd1ba8 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriver.cs +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriver.cs @@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF /// <summary> /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it. /// </summary> - public sealed class HelloDriver : IObserver<IAllocatedEvaluator>, IObserver<IEvaluatorRequestor>, IStartHandler + public sealed class HelloDriver : IObserver<IAllocatedEvaluator>, IStartHandler { /// <summary> /// Contexts contain configuration data used beyond a single task. @@ -55,10 +55,11 @@ namespace Org.Apache.REEF.Examples.HelloREEF private readonly REEFFileNames _fileNames; [Inject] - private HelloDriver(REEFFileNames fileNames) + private HelloDriver(REEFFileNames fileNames, IEvaluatorRequestor evaluatorRequestor) { _fileNames = fileNames; ClrHandlerHelper.GenerateClassHierarchy(GetGlobalAssemblies()); + evaluatorRequestor.Submit(new EvaluatorRequest(number: 1, megaBytes: 64)); } /// <summary> @@ -79,15 +80,6 @@ namespace Org.Apache.REEF.Examples.HelloREEF { } - /// <summary> - /// Ask for one Evaluator with 64MB of memory. - /// </summary> - /// <param name="evaluatorRequestor"></param> - public void OnNext(IEvaluatorRequestor evaluatorRequestor) - { - evaluatorRequestor.Submit(new EvaluatorRequest(number:1, megaBytes:64)); - } - public string Identifier { get; set; } /// <summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs index 87a48e3..41b0d35 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs @@ -54,7 +54,6 @@ namespace Org.Apache.REEF.Examples.HelloREEF { // The driver configuration contains all the needed bindings. var helloDriverConfiguration = DriverBridgeConfiguration.ConfigurationModule - .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<HelloDriver>.Class) .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<HelloDriver>.Class) .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<HelloDriver>.Class) .Build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 9fe61c1..ef208aa 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -23,7 +23,6 @@ import java.util.HashMap; public class NativeInterop { public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin"; public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt"; - public static final String EvaluatorRequestorKey = "EvaluatorRequestor"; public static final String AllocatedEvaluatorKey = "AllocatedEvaluator"; public static final String ActiveContextKey = "ActiveContext"; public static final String TaskMessageKey = "TaskMessage"; @@ -42,7 +41,6 @@ public class NativeInterop { public static final String DriverRestartRunningTaskKey = "DriverRestartRunningTask"; public static final HashMap<String, Integer> Handlers = new HashMap<String, Integer>() { { - put(EvaluatorRequestorKey, 0); put(AllocatedEvaluatorKey, 1); put(ActiveContextKey, 2); put(TaskMessageKey, 3); @@ -68,7 +66,10 @@ public class NativeInterop { public static native void ClrBufferedLog(int level, String message); - public static native long[] CallClrSystemOnStartHandler(String dateTime, String httpServerPortNumber); + public static native long[] CallClrSystemOnStartHandler( + String dateTime, + String httpServerPortNumber, + EvaluatorRequestorBridge javaEvaluatorRequestorBridge); public static native void ClrSystemAllocatedEvaluatorHandlerOnNext( long handle, @@ -82,12 +83,6 @@ public class NativeInterop { InteropLogger interopLogger ); - public static native void ClrSystemEvaluatorRequstorHandlerOnNext( - long handle, - EvaluatorRequestorBridge javaEvluatorRequstorBridge, - InteropLogger interopLogger - ); - public static native void ClrSystemTaskMessageHandlerOnNext( long handle, byte[] mesage, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4b2190b1/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 1e02349..0d4a211 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -117,7 +117,6 @@ public final class JobDriver { */ private final LoggingScopeFactory loggingScopeFactory; - private long evaluatorRequestorHandler = 0; private long allocatedEvaluatorHandler = 0; private long activeContextHandler = 0; private long taskMessageHandler = 0; @@ -193,7 +192,8 @@ public final class JobDriver { LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime}); String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort())); - long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString(), portNumber); + EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); + long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString(), portNumber, evaluatorRequestorBridge); if (handlers != null) { if (handlers.length != NativeInterop.nHandlers) { throw new RuntimeException( @@ -201,7 +201,6 @@ public final class JobDriver { String.valueOf(handlers.length), String.valueOf(NativeInterop.nHandlers))); } - this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)]; this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)]; this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)]; this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)]; @@ -548,16 +547,7 @@ public final class JobDriver { synchronized (JobDriver.this) { setupBridge(startTime); - LOG.log(Level.INFO, "Driver Started"); - - if (JobDriver.this.evaluatorRequestorHandler == 0) { - throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR."); - } - EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); - NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger); - // get the evaluator numbers set by CLR handler - LOG.log(Level.INFO, "evaluator requested at start up: " + evaluatorRequestorBridge.getEvaluatorNumber()); } } }
