Repository: reef Updated Branches: refs/heads/master 6a0947109 -> 59325a311
[REEF-1573] Split REEFLauncher into executable part and the Clock-running environment. This is work towards [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) *"REEF as a library"* effort. Summary of changes: * Split REEFLauncher into launcher and REEFEnvironment. * Improve error handling in REEFLauncher and around. * Fixes in logging and error reporting. * Add unit test for local environment driver execution. * Minor cosmetic and style fixes. * Add comments to explain checkstyle exceptions * Add several TODOs and related JIRAs for future work JIRA: [REEF-1573](https://issues.apache.org/jira/browse/REEF-1573) This closes #1133 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/59325a31 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/59325a31 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/59325a31 Branch: refs/heads/master Commit: 59325a311ce5c3ba8f44187cff6f169197f71b02 Parents: 6a09471 Author: Sergiy Matusevych <[email protected]> Authored: Thu Sep 22 20:38:56 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Oct 6 10:38:51 2016 -0700 ---------------------------------------------------------------------- .../reef/runtime/common/REEFEnvironment.java | 173 +++++++++++++++++++ .../reef/runtime/common/REEFLauncher.java | 116 ++----------- .../runtime/common/launch/REEFErrorHandler.java | 43 +++-- .../launch/REEFUncaughtExceptionHandler.java | 18 +- .../reef/tests/fail/driver/FailClient.java | 50 +++++- .../reef/tests/fail/driver/FailDriver.java | 2 +- .../tests/driver/REEFEnvironmentDriverTest.java | 60 +++++++ .../apache/reef/tests/fail/FailDriverTest.java | 4 +- 8 files changed, 329 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java new file mode 100644 index 0000000..2809d44 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common; + +import org.apache.reef.runtime.common.launch.ProfilingStopHandler; +import org.apache.reef.runtime.common.launch.REEFErrorHandler; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.REEFVersion; +import org.apache.reef.wake.profiler.WakeProfiler; +import org.apache.reef.wake.time.Clock; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The main entry point into any REEF process (Driver and Evaluator). + * It is mostly reading from the command line to instantiate + * the runtime clock and calling .run() on it. + */ +public final class REEFEnvironment implements Runnable, AutoCloseable { + + /** + * Parameter to enable Wake network profiling. By default profiling is disabled. + * TODO[REEF-1629] Move that parameter and related code into Wake package. + */ + @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") + private static final class ProfilingEnabled implements Name<Boolean> { } + + private static final Logger LOG = Logger.getLogger(REEFEnvironment.class.getName()); + + private static final Tang TANG = Tang.Factory.getTang(); + + /** Main event loop of current REEF component (Driver or Evaluator). */ + private final Clock clock; + + /** Error handler that processes all uncaught REEF exceptions. */ + private final REEFErrorHandler errorHandler; + + /** + * Create a new REEF environment. + * @param configurations REEF component (Driver or Evaluator) configuration. + * If multiple configurations are provided, they will be merged before use. + * Main part of the configuration is usually read from config file by REEFLauncher. + * @throws InjectionException Thrown on configuration error. + */ + @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler + public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException { + + final Configuration config = Configurations.merge(configurations); + + if (LOG.isLoggable(Level.FINEST)) { + // TODO[REEF-1633] Obtain default serializer from Tang, or use Tang to pretty print. + LOG.log(Level.FINEST, "Configuration:\n--\n{0}\n--", + new AvroConfigurationSerializer().toString(config, true)); + } + + final Injector injector = TANG.newInjector(config); + + if (injector.getNamedInstance(ProfilingEnabled.class)) { + final WakeProfiler profiler = new WakeProfiler(); + ProfilingStopHandler.setProfiler(profiler); + injector.bindAspect(profiler); + } + + injector.getInstance(REEFVersion.class).logVersion(); + + final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class); + + try { + + final Clock clock = injector.getInstance(Clock.class); + return new REEFEnvironment(clock, errorHandler); + + } catch (final Throwable ex) { + LOG.log(Level.SEVERE, "Error while instantiating the clock", ex); + try { + errorHandler.onNext(ex); + } catch (final Throwable exHandling) { + LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling); + } + throw ex; + } + } + + /** + * Use .fromConfiguration() method to create new REEF environment. + * @param clock main event loop. + * @param errorHandler error handler. + */ + private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) { + this.clock = clock; + this.errorHandler = errorHandler; + } + + /** + * Close and cleanup the environment. + * Invoke .close() on all closeable members (clock and error handler). + */ + @Override + @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler + public void close() { + + LOG.log(Level.FINER, "Closing REEF Environment - start"); + + try { + this.clock.close(); + } catch (final Throwable ex) { + LOG.log(Level.SEVERE, "Error while closing the clock", ex); + try { + this.errorHandler.onNext(ex); + } catch (final Throwable exHandling) { + LOG.log(Level.SEVERE, "Error while handling the exception " + ex, exHandling); + } + } finally { + try { + this.errorHandler.close(); + } catch (final Throwable ex) { + LOG.log(Level.SEVERE, "Error while closing the error handler", ex); + } + } + + LOG.log(Level.FINER, "Closing REEF Environment - end"); + } + + /** + * Launch REEF component (Driver or Evaluator). + * It is usually called from the static .run() method. + */ + @Override + @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler + public void run() { + + LOG.log(Level.FINE, "REEF started with user name [{0}]", System.getProperty("user.name")); + LOG.log(Level.FINE, "REEF started. Assertions are {0} in this process.", + EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); + + try { + + LOG.log(Level.FINEST, "Clock: start"); + this.clock.run(); + LOG.log(Level.FINEST, "Clock: exit normally"); + + } catch (final Throwable ex) { + LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex); + this.errorHandler.onNext(ex); + throw ex; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java index 3ad5fe4..74de099 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java @@ -19,24 +19,18 @@ package org.apache.reef.runtime.common; import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; -import org.apache.reef.runtime.common.launch.ProfilingStopHandler; import org.apache.reef.runtime.common.launch.REEFErrorHandler; import org.apache.reef.runtime.common.launch.REEFMessageCodec; import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; import org.apache.reef.tang.*; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.BindException; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.AvroConfigurationSerializer; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.util.REEFVersion; import org.apache.reef.util.ThreadLogger; import org.apache.reef.util.logging.LoggingSetup; -import org.apache.reef.wake.profiler.WakeProfiler; import org.apache.reef.wake.remote.RemoteConfiguration; import org.apache.reef.wake.time.Clock; @@ -54,12 +48,6 @@ import java.util.logging.Logger; */ public final class REEFLauncher { - /** - * Parameter to enable profiling. By default profiling is disabled. - */ - @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") - private static final class ProfilingEnabled implements Name<Boolean> { } - private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); private static final Tang TANG = Tang.Factory.getTang(); @@ -76,42 +64,28 @@ public final class REEFLauncher { LoggingSetup.setupCommonsLogging(); } - /** Config parameter to turn on network IO profiling in Wake. */ - private final boolean isWakeProfilingEnabled; - - /** REEF version - we need it simply to write it to the log. */ - private final REEFVersion reefVersion; - /** * Main configuration object of the REEF component we are launching here. - * The launcher uses that configuration to instantiate the Clock object, + * REEFEnvironment uses that configuration to instantiate the Clock object, * and then call .run() on it. */ - private final Configuration clockConfig; + private final Configuration envConfig; /** * REEFLauncher is instantiated in the main() method below using * Tang configuration file provided as a command line argument. * @param configurationPath Path to the serialized Tang configuration file. * (The file must be in the local file system). - * @param enableProfiling If true, turn on profiling in Wake. * @param configurationSerializer Serializer used to read the configuration file. * We currently use Avro to serialize Tang configs. - * @param reefVersion An injectable object that contains REEF version. */ @Inject private REEFLauncher( @Parameter(ClockConfigurationPath.class) final String configurationPath, - @Parameter(ProfilingEnabled.class) final boolean enableProfiling, - final ConfigurationSerializer configurationSerializer, - final REEFVersion reefVersion) { - - this.isWakeProfilingEnabled = enableProfiling; - this.reefVersion = reefVersion; + final ConfigurationSerializer configurationSerializer) { - this.clockConfig = Configurations.merge( - readConfigurationFromDisk(configurationPath, configurationSerializer), - LAUNCHER_STATIC_CONFIG); + this.envConfig = Configurations.merge(LAUNCHER_STATIC_CONFIG, + readConfigurationFromDisk(configurationPath, configurationSerializer)); } /** @@ -130,10 +104,10 @@ public final class REEFLauncher { return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class); - } catch (final BindException e) { - throw fatal("Error in parsing the command line", e); - } catch (final InjectionException e) { - throw fatal("Unable to run REEFLauncher.", e); + } catch (final BindException ex) { + throw fatal("Error in parsing the command line", ex); + } catch (final InjectionException ex) { + throw fatal("Unable to instantiate REEFLauncher.", ex); } } @@ -169,11 +143,7 @@ public final class REEFLauncher { try { final Configuration config = serializer.fromFile(evaluatorConfigFile); - - if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "Configuration file {0} loaded:\n--\n{1}\n--", - new Object[] {configPath, new AvroConfigurationSerializer().toString(config, true)}); - } + LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath); return config; @@ -184,13 +154,14 @@ public final class REEFLauncher { /** * Launches a REEF client process (Driver or Evaluator). - * @param args Command-line arguments - - * must be a single element containing local path to the configuration file. + * @param args Command-line arguments. + * Must be a single element containing local path to the configuration file. */ @SuppressWarnings("checkstyle:illegalcatch") public static void main(final String[] args) { LOG.log(Level.INFO, "Entering REEFLauncher.main()."); + LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name")); LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.", EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); @@ -204,21 +175,12 @@ public final class REEFLauncher { final REEFLauncher launcher = getREEFLauncher(args[0]); - Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig)); - launcher.reefVersion.logVersion(); // Write REEF version to the log. - - try (final Clock clock = launcher.getClockFromConfig()) { - - LOG.log(Level.FINE, "Clock: start"); - clock.run(); - LOG.log(Level.FINE, "Clock: exit normally"); + Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); + try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(launcher.envConfig)) { + reef.run(); } catch (final Throwable ex) { - try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) { - throw fatal(errorHandler, "Unable to instantiate the clock", ex); - } catch (final InjectionException e) { - throw fatal("Unable to instantiate the clock and the ErrorHandler", e); - } + throw fatal("Unable to configure and start REEFEnvironment.", ex); } LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); @@ -235,35 +197,6 @@ public final class REEFLauncher { } /** - * A new REEFErrorHandler is instantiated instead of lazy instantiation - * and saving the instantiated handler as a field since the ErrorHandler is closeable. - * @return A REEFErrorHandler object instantiated from clock config. - * @throws InjectionException configuration error. - */ - private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException { - return TANG.newInjector(this.clockConfig).getInstance(REEFErrorHandler.class); - } - - /** - * A new Clock is instantiated instead of lazy instantiation and saving the instantiated - * handler as a field since the Clock is closeable. - * @return A Clock object instantiated from the configuration. - * @throws InjectionException configuration error. - */ - private Clock getClockFromConfig() throws InjectionException { - - final Injector clockInjector = TANG.newInjector(this.clockConfig); - - if (this.isWakeProfilingEnabled) { - final WakeProfiler profiler = new WakeProfiler(); - ProfilingStopHandler.setProfiler(profiler); - clockInjector.bindAspect(profiler); - } - - return clockInjector.getInstance(Clock.class); - } - - /** * Wrap an exception into RuntimeException with a given message, * and write the same message and exception to the log. * @param msg an error message to log and pass into the RuntimeException. @@ -274,19 +207,4 @@ public final class REEFLauncher { LOG.log(Level.SEVERE, msg, t); return new RuntimeException(msg, t); } - - /** - * Pass exception into an error handler, then wrap it into RuntimeException - * with a given message, and write the same message and exception to the log. - * @param errorHandler an error handler that consumes the exception before any further processing. - * @param msg an error message to log and pass into the RuntimeException. - * @param t A Throwable exception to log, wrap, and handle. - * @return a new Runtime exception wrapping a Throwable. - */ - private static RuntimeException fatal( - final REEFErrorHandler errorHandler, final String msg, final Throwable t) { - LOG.log(Level.SEVERE, msg, t); - errorHandler.onNext(t); - return new RuntimeException(msg, t); - } } http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java index be71ff5..e32537c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java @@ -59,24 +59,33 @@ public final class REEFErrorHandler implements EventHandler<Throwable>, AutoClos @Override @SuppressWarnings("checkstyle:illegalcatch") - public void onNext(final Throwable e) { - LOG.log(Level.SEVERE, "Uncaught exception.", e); - if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) { - final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get() - .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class); - final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder() - .setName("reef") - .setIdentifier(launchID) - .setMessage(e.getMessage()) - .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e))) - .build(); - try { - runtimeErrorHandler.onNext(message); - } catch (final Throwable t) { - LOG.log(Level.SEVERE, "Unable to send the error upstream", t); - } - } else { + public void onNext(final Throwable ex) { + + LOG.log(Level.SEVERE, "Uncaught exception.", ex); + + if (this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) { LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream"); + return; + } + + try { + + final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = + this.remoteManager.get().getHandler(this.errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class); + + final ReefServiceProtos.RuntimeErrorProto message = + ReefServiceProtos.RuntimeErrorProto.newBuilder() + .setName("reef") + .setIdentifier(this.launchID) + .setMessage(ex.getMessage()) + .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(ex))) + .build(); + + runtimeErrorHandler.onNext(message); + LOG.log(Level.INFO, "Successfully sent the error upstream: {0}", ex.toString()); + + } catch (final Throwable t) { + LOG.log(Level.SEVERE, "Unable to send the error upstream", t); } } http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java index 340c9b3..dd481a6 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java @@ -51,19 +51,20 @@ public final class REEFUncaughtExceptionHandler implements Thread.UncaughtExcept @Override public synchronized void uncaughtException(final Thread thread, final Throwable throwable) { + + final String msg = "Thread " + thread.getName() + " threw an uncaught exception."; + LOG.log(Level.SEVERE, msg, throwable); + if (this.errorHandler == null) { try { - this.errorHandler = Tang.Factory.getTang().newInjector(this.errorHandlerConfig) - .getInstance(REEFErrorHandler.class); - } catch (InjectionException ie) { + this.errorHandler = Tang.Factory.getTang() + .newInjector(this.errorHandlerConfig).getInstance(REEFErrorHandler.class); + } catch (final InjectionException ie) { LOG.log(Level.WARNING, "Unable to inject error handler."); } } - final String msg = "Thread " + thread.getName() + " threw an uncaught exception."; - if (this.errorHandler != null) { - LOG.log(Level.SEVERE, msg, throwable); this.errorHandler.onNext(new Exception(msg, throwable)); try { this.wait(100); @@ -74,13 +75,12 @@ public final class REEFUncaughtExceptionHandler implements Thread.UncaughtExcept } LOG.log(Level.SEVERE, msg + " System.exit(1)"); + System.exit(1); } @Override public String toString() { - return "REEFUncaughtExceptionHandler{" + - "errorHandler=" + String.valueOf(this.errorHandler) + - '}'; + return "REEFUncaughtExceptionHandler{errorHandler=" + this.errorHandler + '}'; } } http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java index 8e3476b..9d64500 100644 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java @@ -18,12 +18,13 @@ */ package org.apache.reef.tests.fail.driver; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.client.DriverConfiguration; import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.common.REEFEnvironment; import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.JavaConfigurationBuilder; import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.BindException; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tests.TestDriverLauncher; import org.apache.reef.util.EnvironmentUtils; @@ -31,11 +32,11 @@ import org.apache.reef.util.EnvironmentUtils; /** * Client for the test REEF job that fails on different stages of execution. */ +@Private +@ClientSide public final class FailClient { - public static LauncherStatus run(final Class<?> failMsgClass, - final Configuration runtimeConfig, - final int timeOut) throws BindException, InjectionException { + private static Configuration buildDriverConfig(final Class<?> failMsgClass) { final Configuration driverConfig = DriverConfiguration.CONF .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailDriver.class)) @@ -56,11 +57,42 @@ public final class FailClient { .set(DriverConfiguration.ON_TASK_COMPLETED, FailDriver.CompletedTaskHandler.class) .build(); - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - cb.addConfiguration(driverConfig); - cb.bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName()); + return Tang.Factory.getTang().newConfigurationBuilder(driverConfig) + .bindNamedParameter(FailDriver.FailMsgClassName.class, failMsgClass.getName()) + .build(); + } + + /** + * Run REEF on specified runtime and fail (raise an exception) in a specified class. + * @param failMsgClass A class that should fail during the test. + * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN. + * @param timeOut REEF application timeout. + * @return launcher status - usually FAIL. + * @throws InjectionException configuration error. + */ + public static LauncherStatus runClient(final Class<?> failMsgClass, + final Configuration runtimeConfig, final int timeOut) throws InjectionException { + + return TestDriverLauncher.getLauncher(runtimeConfig).run(buildDriverConfig(failMsgClass), timeOut); + } + + /** + * Run REEF in-process using specified runtime and fail (raise an exception) in a specified class. + * @param failMsgClass A class that should fail during the test. + * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN. + * @param timeOut REEF application timeout - not used yet. + * @return launcher status - usually FAIL. + * @throws InjectionException configuration error. + */ + public static LauncherStatus runInProcess(final Class<?> failMsgClass, + final Configuration runtimeConfig, final int timeOut) throws InjectionException { + + try (final REEFEnvironment reef = + REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) { + reef.run(); + } - return TestDriverLauncher.getLauncher(runtimeConfig).run(cb.build(), timeOut); + return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented. } /** http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java index c0e6412..fe1b507 100644 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailDriver.java @@ -144,7 +144,7 @@ public final class FailDriver { if (this.state == DriverState.FAILED) { final SimulatedDriverFailure ex = new SimulatedDriverFailure( "Simulated Failure at FailDriver :: " + msgClassName); - LOG.log(Level.INFO, "Simulated Failure: {0}", ex); + LOG.log(Level.INFO, "Simulated Failure:", ex); throw ex; } } http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java new file mode 100644 index 0000000..088aee3 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.driver; + +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.runtime.common.REEFEnvironment; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.local.driver.LocalDriverConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.util.EnvironmentUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * This tests whether the noop driver gets shutdown properly. + */ +public final class REEFEnvironmentDriverTest { + + private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class)) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest") + .set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class) + .build(); + + private static final Configuration LOCAL_DRIVER_MODULE = LocalDriverConfiguration.CONF + .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 1) + .set(LocalDriverConfiguration.ROOT_FOLDER, ".") + .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0) + .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) + .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST") + .set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME) + .build(); + + @Test + public void testREEFEnvironmentDriver() throws BindException, InjectionException { + try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) { + reef.run(); + } catch (final Throwable ex) { + Assert.fail("Local driver execution failed: " + ex); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/59325a31/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java index 821b4bf..7ad803b 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/fail/FailDriverTest.java @@ -62,7 +62,7 @@ public class FailDriverTest { private void failOn(final Class<?> clazz) throws BindException, InjectionException { TestUtils.assertLauncherFailure( - FailClient.run(clazz, + FailClient.runClient(clazz, this.testEnvironment.getRuntimeConfiguration(), this.testEnvironment.getTestTimeout()), SimulatedDriverFailure.class); } @@ -126,7 +126,7 @@ public class FailDriverTest { public void testDriverCompleted() throws BindException, InjectionException { final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration(); // FailDriverTest can be replaced with any other class never used in FailDriver - final LauncherStatus status = FailClient.run( + final LauncherStatus status = FailClient.runClient( FailDriverTest.class, runtimeConfiguration, this.testEnvironment.getTestTimeout()); Assert.assertEquals(LauncherStatus.COMPLETED, status); }
