Repository: reef Updated Branches: refs/heads/master e8b7c08f0 -> 37d1891c6
[REEF-1521] Decouple bridge handler setup and DriverStart event delegation This addressed the issue by * adding a native method solely for setting up the Java & .NET bridge handler * removing bridge setup code from the StartHandler/RestartHandler native methods * decoupling start/restart event trigger code from bridge handler setup * moving start/restart event trigger code out of `synchronized` blocks * adding a test for checking that start/restart event handlers do not block other event handlers * removing the ClrHandlersInitializer interface and its implementations for simplicity JIRA: [REEF-1521](https://issues.apache.org/jira/browse/REEF-1521) Pull Request: This closes #1095 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/37d1891c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/37d1891c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/37d1891c Branch: refs/heads/master Commit: 37d1891c60de25e7cb92cdf50ed722b692991acf Parents: e8b7c08 Author: Jason (Joo Seong) Jeong <[email protected]> Authored: Thu Aug 11 16:53:46 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Sep 27 10:41:12 2016 -0700 ---------------------------------------------------------------------- .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 48 ++++++--- .../Bridge/ClrSystemHandlerWrapper.cs | 46 ++++---- .../Functional/Bridge/TestDriverConcurrency.cs | 106 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../apache/reef/javabridge/NativeInterop.java | 15 ++- .../generic/ClrHandlersInitializer.java | 39 ------- .../DriverRestartClrHandlersInitializer.java | 53 ---------- .../DriverStartClrHandlersInitializer.java | 52 --------- .../reef/javabridge/generic/JobDriver.java | 27 +++-- 9 files changed, 191 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index e7b272a..25a4c77 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -134,19 +134,14 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_loadClrAsse /* * Class: org_apache_reef_javabridge_NativeInterop * Method: callClrSystemOnStartHandler - * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V + * Signature: ()V */ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler -(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) { +(JNIEnv * env, jclass jclassx) { try { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler"); DateTime dt = DateTime::Now; - - String^ strPort = ManagedStringFromJavaString(env, httpServerPort); - - EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); - BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge); - populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager); + ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt); } catch (System::Exception^ ex) { // we cannot get error back to java here since we don't have an object to call back (although we ideally should...) @@ -470,18 +465,14 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemCo /* * Class: org_apache_reef_javabridge_NativeInterop * Method: callClrSystemOnRestartHandler - * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V + * Signature: (Lorg/apache/reef/javabridge/DriverRestartedBridge;)V */ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler -(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) { +(JNIEnv * env, jclass jclassx, jobject jdriverRestartedBridge) { try { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler"); - String^ strPort = ManagedStringFromJavaString(env, httpServerPort); - - EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); DriverRestartedClr2Java^ driverRestartedBridge = gcnew DriverRestartedClr2Java(env, jdriverRestartedBridge); - BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge); - populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager); + ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(driverRestartedBridge); } catch (System::Exception^ ex) { // we cannot get error back to java here since we don't have an object to call back (although we ideally should...) @@ -581,12 +572,32 @@ JNIEXPORT jfloat JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystem } } +/* + * Class: org_apache_reef_javabridge_NativeInterop + * Method: clrSystemSetupBridgeHandlerManager + * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V + */ +JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager +(JNIEnv * env, jclass cls, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) { + try { + ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager"); + + String^ strPort = ManagedStringFromJavaString(env, httpServerPort); + EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); + BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystem_SetupBridgeHandlerManager(strPort, evaluatorRequestorBridge); + populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager); + } + catch (System::Exception^ ex) { + ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager", ex); + } +} + static JNINativeMethod methods[] = { { "loadClrAssembly", "(Ljava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_loadClrAssembly }, { "clrBufferedLog", "(ILjava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrBufferedLog }, - { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V", + { "callClrSystemOnStartHandler", "()V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler }, { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V", @@ -628,7 +639,7 @@ static JNINativeMethod methods[] = { { "clrSystemContextMessageHandlerOnNext", "(JLorg/apache/reef/javabridge/ContextMessageBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext }, - { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V", + { "callClrSystemOnRestartHandler", "(Lorg/apache/reef/javabridge/DriverRestartedBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler }, { "clrSystemDriverRestartActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;)V", @@ -645,6 +656,9 @@ static JNINativeMethod methods[] = { { "clrSystemProgressProviderGetProgress", "(J)F", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress }, + + { "clrSystemSetupBridgeHandlerManager", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V", + (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager }, }; JNIEXPORT void JNICALL http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs index 9f3497f..0895aa0 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -245,37 +245,47 @@ namespace Org.Apache.REEF.Driver.Bridge } } - public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart( - DateTime startTime, - string httpServerPort, - IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) + /// <summary> + /// Invokes event handlers registered to the driver start event. + /// </summary> + /// <param name="startTime"><see cref="DateTime"/> object that represents when this method was called.</param> + public static void Call_ClrSystemStartHandler_OnStart(DateTime startTime) { - IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) { LOGGER.Log(Level.Info, "*** Start time is " + startTime); - LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); - var handlers = GetHandlers(httpServerPort, evaluatorRequestor); _driverBridge.StartHandlersOnNext(startTime); - - return handlers; - } + } } - public static BridgeHandlerManager Call_ClrSystemRestartHandler_OnRestart( - string httpServerPort, - IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java, - IDriverRestartedClr2Java driverRestartedClr2Java) + /// <summary> + /// Invokes event handlers registered to the driver restart event. + /// </summary> + /// <param name="driverRestartedClr2Java">Proxy object to the Java driver restart event object.</param> + public static void Call_ClrSystemRestartHandler_OnRestart(IDriverRestartedClr2Java driverRestartedClr2Java) { - IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart")) { LOGGER.Log(Level.Info, "*** Restart time is " + driverRestartedClr2Java.GetStartTime()); - LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); - var handlers = GetHandlers(httpServerPort, evaluatorRequestor); _driverBridge.RestartHandlerOnNext(driverRestartedClr2Java); + } + } - return handlers; + /// <summary> + /// Configure and return a manager object holding all subscriptions given to REEF events on the .NET side. + /// </summary> + /// <param name="httpServerPort">String representation of the http port of the Java-side driver.</param> + /// <param name="evaluatorRequestorClr2Java">Proxy object to the Java evaluator requestor object.</param> + /// <returns><see cref="BridgeHandlerManager"/> object that contains .NET handles for each REEF event.</returns> + public static BridgeHandlerManager Call_ClrSystem_SetupBridgeHandlerManager( + string httpServerPort, + IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) + { + IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystem_SetupBridgeHandlerManager")) + { + LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); + return GetHandlers(httpServerPort, evaluatorRequestor); } } http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs new file mode 100644 index 0000000..0c3581f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + /// <summary> + /// Test class containing tests related to the concurrency of the .NET driver. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TestDriverConcurrency : ReefFunctionalTest + { + private const int EvalNum = 1; + private const int DriverWaitTimeoutMilliseconds = 60000; + + /// <summary> + /// Check that event handlers for the driver start event and the evaluator allocated event can be run concurrently. + /// </summary> + [Fact] + public void TestStartHandlerAndEvalAllocatedHandlerOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(GetDriverConfiguration(), + typeof(DriverConcurrencyTestDriver), + EvalNum, + "TestStartHandlerAndEvalAllocatedHandlerOnLocalRuntime", + "local", + testFolder); + ValidateSuccessForLocalRuntime(0, testFolder: testFolder); + CleanUp(testFolder); + } + + private IConfiguration GetDriverConfiguration() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverConcurrencyTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<DriverConcurrencyTestDriver>.Class) + .Build(); + } + + private sealed class DriverConcurrencyTestDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(DriverConcurrencyTestDriver)); + private readonly IEvaluatorRequestor _evaluatorRequestor; + private readonly CountdownEvent _countdownEvent; + + [Inject] + private DriverConcurrencyTestDriver(IEvaluatorRequestor evaluatorRequestor) + { + _evaluatorRequestor = evaluatorRequestor; + _countdownEvent = new CountdownEvent(EvalNum); + } + + public void OnNext(IDriverStarted driverStarted) + { + Logger.Log(Level.Info, "Requesting {0} evaluators.", EvalNum); + _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder() + .SetNumber(EvalNum) + .Build()); + + // wait until the expected number of evaluator allocated events have fired + Assert.True(_countdownEvent.Wait(DriverWaitTimeoutMilliseconds)); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + Logger.Log(Level.Info, "Trigger {0} and close {1}", _countdownEvent, allocatedEvaluator.Id); + _countdownEvent.Signal(); + allocatedEvaluator.Dispose(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index d043ab1..4774eb0 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -78,6 +78,7 @@ under the License. <Compile Include="Functional\Bridge\TestBridgeClient.cs" /> <Compile Include="Functional\Bridge\TestCloseTask.cs" /> <Compile Include="Functional\Bridge\TestContextStack.cs" /> + <Compile Include="Functional\Bridge\TestDriverConcurrency.cs" /> <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" /> <Compile Include="Functional\Common\Task\ExceptionTask.cs" /> <Compile Include="Functional\Common\Task\NullTask.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 6de53ef..83f783f 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -35,11 +35,7 @@ public final class NativeInterop { public static native void clrBufferedLog(final int level, final String message); - public static native void callClrSystemOnStartHandler( - final String dateTime, - final String httpServerPortNumber, - final BridgeHandlerManager bridgeHandlerManager, - final EvaluatorRequestorBridge javaEvaluatorRequestorBridge); + public static native void callClrSystemOnStartHandler(); public static native void clrSystemAllocatedEvaluatorHandlerOnNext( final long handle, @@ -116,9 +112,6 @@ public final class NativeInterop { ); public static native void callClrSystemOnRestartHandler( - final String httpServerPortNumber, - final BridgeHandlerManager bridgeHandlerManager, - final EvaluatorRequestorBridge javaEvaluatorRequestorBridge, final DriverRestartedBridge driverRestartedBridge ); @@ -145,6 +138,12 @@ public final class NativeInterop { public static native float clrSystemProgressProviderGetProgress(final long handle); + public static native void clrSystemSetupBridgeHandlerManager( + final String httpServerPortNumber, + final BridgeHandlerManager bridgeHandlerManager, + final EvaluatorRequestorBridge javaEvaluatorRequestorBridge + ); + /** * Empty private constructor to prohibit instantiation of utility class. */ http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java deleted file mode 100644 index a151fd1..0000000 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.javabridge.BridgeHandlerManager; -import org.apache.reef.javabridge.EvaluatorRequestorBridge; - -/** - * An initializer interface that initializes ClrHandlers for the CLR {@link JobDriver}. - */ -@DriverSide -@Private -@Unstable -interface ClrHandlersInitializer { - - /** - * Returns the set of CLR handles. - */ - BridgeHandlerManager getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge); -} http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java deleted file mode 100644 index 1da5347..0000000 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.restart.DriverRestarted; -import org.apache.reef.javabridge.BridgeHandlerManager; -import org.apache.reef.javabridge.DriverRestartedBridge; -import org.apache.reef.javabridge.EvaluatorRequestorBridge; -import org.apache.reef.javabridge.NativeInterop; - -/** - * An initializer implementation that initializes ClrHandlers for the CLR {@link JobDriver} for the case - * where the Driver has restarted. - */ -@Private -@DriverSide -@Unstable -final class DriverRestartClrHandlersInitializer implements ClrHandlersInitializer { - private final DriverRestarted driverRestarted; - - DriverRestartClrHandlersInitializer(final DriverRestarted driverRestarted) { - this.driverRestarted = driverRestarted; - } - - @Override - public BridgeHandlerManager getClrHandlers(final String portNumber, - final EvaluatorRequestorBridge evaluatorRequestorBridge) { - final BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager(); - NativeInterop.callClrSystemOnRestartHandler(portNumber, bridgeHandlerManager, evaluatorRequestorBridge, - new DriverRestartedBridge(driverRestarted)); - - return bridgeHandlerManager; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java deleted file mode 100644 index 765869c..0000000 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.javabridge.BridgeHandlerManager; -import org.apache.reef.javabridge.EvaluatorRequestorBridge; -import org.apache.reef.javabridge.NativeInterop; -import org.apache.reef.wake.time.event.StartTime; - -/** - * An initializer implementation that initializes ClrHandlers for the CLR {@link JobDriver} for the case - * of regular Driver startup. - */ -@Private -@DriverSide -@Unstable -final class DriverStartClrHandlersInitializer implements ClrHandlersInitializer { - private final StartTime startTime; - - DriverStartClrHandlersInitializer(final StartTime startTime) { - this.startTime = startTime; - } - - @Override - public BridgeHandlerManager getClrHandlers(final String portNumber, - final EvaluatorRequestorBridge evaluatorRequestorBridge) { - BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager(); - NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, bridgeHandlerManager, - evaluatorRequestorBridge); - - return bridgeHandlerManager; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/37d1891c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index b0c8d41..8a0fb86 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -167,7 +167,7 @@ public final class JobDriver { this.definedRuntimes = definedRuntimes; } - private void setupBridge(final ClrHandlersInitializer initializer) { + private void setupBridge() { // Signal to the clr buffered log handler that the driver has started and that // we can begin logging LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); @@ -196,7 +196,9 @@ public final class JobDriver { this.evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory, JobDriver.this.definedRuntimes); - JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); + JobDriver.this.handlerManager = new BridgeHandlerManager(); + NativeInterop.clrSystemSetupBridgeHandlerManager(portNumber, + JobDriver.this.handlerManager, evaluatorRequestorBridge); try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { @@ -581,11 +583,15 @@ public final class JobDriver { @Override public void onNext(final StartTime startTime) { try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { + // CLR bridge setup must be done before other event handlers try to access the CLR bridge + // thus we grab a lock on this instance synchronized (JobDriver.this) { - - setupBridge(new DriverStartClrHandlersInitializer(startTime)); - LOG.log(Level.INFO, "Driver Started"); + setupBridge(); + LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", startTime); } + + NativeInterop.callClrSystemOnStartHandler(); + LOG.log(Level.INFO, "Driver Started"); } } } @@ -598,13 +604,16 @@ public final class JobDriver { @Override public void onNext(final DriverRestarted driverRestarted) { try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { + // CLR bridge setup must be done before other event handlers try to access the CLR bridge + // thus we lock on this instance synchronized (JobDriver.this) { - JobDriver.this.isRestarted = true; - setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted)); - - LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up."); + setupBridge(); + LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", driverRestarted); } + + NativeInterop.callClrSystemOnRestartHandler(new DriverRestartedBridge(driverRestarted)); + LOG.log(Level.INFO, "Driver Restarted"); } } }
