Repository: reef Updated Branches: refs/heads/master 8b7ae014e -> 2bfb7318f
[REEF-1519] Improve readability of code and logs in REEF launcher Multiple code clean-ups that do not impact the functionality. * Add missing Javadoc comments and expand the existing ones. * Reorder methods to make code more readable. * Fix the formatting for code clarity. * Do minor reafactoring to remove duplicate code and improve readability. * Minor fixes to log messages for clarity and searchability. * Pretty print JSON configuration on startup. JIRA: [REEF-1519](https://issues.apache.org/jira/browse/REEF-1519) Pull request: Closes #1091 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2bfb7318 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2bfb7318 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2bfb7318 Branch: refs/heads/master Commit: 2bfb7318f5694b30884ebf240156de4750eaa6e2 Parents: 8b7ae01 Author: Sergiy Matusevych <[email protected]> Authored: Wed Aug 10 18:18:10 2016 -0700 Committer: Sergey Dudoladov <[email protected]> Committed: Thu Aug 11 18:54:05 2016 -0700 ---------------------------------------------------------------------- .../reef/runtime/common/REEFLauncher.java | 210 ++++++++++++------- .../formats/AvroConfigurationSerializer.java | 18 +- 2 files changed, 155 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/2bfb7318/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java old mode 100644 new mode 100755 index 917fd00..dfe693f --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java @@ -30,6 +30,7 @@ import org.apache.reef.tang.annotations.NamedParameter; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.BindException; import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.EnvironmentUtils; import org.apache.reef.util.REEFVersion; @@ -47,114 +48,149 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * The main entrance point into any REEF process. It is mostly reading from the command line to instantiate + * The main entry point into any REEF process (Driver and Evaluator). + * It is mostly reading from the command line to instantiate * the runtime clock and calling .run() on it. */ public final class REEFLauncher { /** - * Parameter which enables profiling. + * Parameter to enable profiling. By default profiling is disabled. */ @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false") - public static final class ProfilingEnabled implements Name<Boolean> { - } + private static final class ProfilingEnabled implements Name<Boolean> { } private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); - private static final Configuration LAUNCHER_STATIC_CONFIG = Tang.Factory.getTang().newConfigurationBuilder() - .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) - .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) + private static final Tang TANG = Tang.Factory.getTang(); + + private static final Configuration LAUNCHER_STATIC_CONFIG = + TANG.newConfigurationBuilder() .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER") + .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) .build(); static { LoggingSetup.setupCommonsLogging(); } - private final String configurationPath; - private final boolean isProfilingEnabled; - private final ConfigurationSerializer configurationSerializer; + private final boolean isWakeProfilingEnabled; + + /** REEF version - we need it simply to write it to the log. */ private final REEFVersion reefVersion; + + /** + * Main configuration object of the REEF component we are launching here. + * The launcher uses that configuration to instantiate the Clock object, + * and then call .run() on it. + */ private final Configuration clockConfig; + /** + * REEFLauncher is instantiated in the main() method below using + * Tang configuration file provided as a command line argument. + * @param configurationPath Path to the serialized Tang configuration file. + * (The file must be in the local file system). + * @param enableProfiling If true, turn on profiling in Wake. + * @param configurationSerializer Serializer used to read the configuration file. + * We currently use Avro to serialize Tang configs. + * @param reefVersion An injectable object that contains REEF version. + */ @Inject - private REEFLauncher(@Parameter(ClockConfigurationPath.class) final String configurationPath, - @Parameter(ProfilingEnabled.class) final boolean enableProfiling, - final ConfigurationSerializer configurationSerializer, - final REEFVersion reefVersion) { - this.configurationPath = configurationPath; - this.configurationSerializer = configurationSerializer; - this.isProfilingEnabled = enableProfiling; + private REEFLauncher( + @Parameter(ClockConfigurationPath.class) final String configurationPath, + @Parameter(ProfilingEnabled.class) final boolean enableProfiling, + final ConfigurationSerializer configurationSerializer, + final REEFVersion reefVersion) { + + this.isWakeProfilingEnabled = enableProfiling; this.reefVersion = reefVersion; + this.clockConfig = Configurations.merge( - readConfigurationFromDisk(this.configurationPath, this.configurationSerializer), - LAUNCHER_STATIC_CONFIG); + readConfigurationFromDisk(configurationPath, configurationSerializer), + LAUNCHER_STATIC_CONFIG); } + /** + * Instantiate REEF Launcher. This method is called from REEFLauncher.main(). + * @param clockConfigPath Path to the local file that contains serialized configuration + * of a REEF component to launch (can be either Driver or Evaluator). + * @return An instance of the configured REEFLauncher object. + */ private static REEFLauncher getREEFLauncher(final String clockConfigPath) { - final Injector injector; + try { - final Configuration clockArgConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath).build(); - injector = Tang.Factory.getTang().newInjector(clockArgConfig); + + final Configuration clockArgConfig = TANG.newConfigurationBuilder() + .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) + .build(); + + return TANG.newInjector(clockArgConfig).getInstance(REEFLauncher.class); + } catch (final BindException e) { throw fatal("Error in parsing the command line", e); - } - - try { - return injector.getInstance(REEFLauncher.class); } catch (final InjectionException e) { throw fatal("Unable to run REEFLauncher.", e); } } - private static RuntimeException fatal(final String msg, final Throwable t) { - LOG.log(Level.SEVERE, msg, t); - return new RuntimeException(msg, t); - } - - private static RuntimeException fatal(final REEFErrorHandler errorHandler, final String msg, final Throwable t) { - errorHandler.onNext(t); - LOG.log(Level.SEVERE, msg, t); - return new RuntimeException(msg, t); - } - - @SuppressWarnings("checkstyle:constructorwithoutparams") // avoids logging the same message twice in fatal() + /** + * Read configuration from a given file and deserialize it + * into Tang configuration object that can be used for injection. + * Configuration is currently serialized using Avro. + * This method also prints full deserialized configuration into log. + * @param configPath Path to the local file that contains serialized configuration + * of a REEF component to launch (can be either Driver or Evaluator). + * @param serializer An object to deserialize the configuration file. + * @return Tang configuration read and deserialized from a given file. + */ private static Configuration readConfigurationFromDisk( final String configPath, final ConfigurationSerializer serializer) { - LOG.log(Level.FINEST, "Loading configuration file: {0}", configPath); + + LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); final File evaluatorConfigFile = new File(configPath); if (!evaluatorConfigFile.exists()) { - final String message = "The configuration file " + configPath + - "doesn't exist. This points to an issue in the job submission."; - throw fatal(message, new FileNotFoundException()); - } else if (!evaluatorConfigFile.canRead()) { - final String message = "The configuration file " + configPath + - " exists, but can't be read"; - throw fatal(message, new IOException()); - } else { - try { - return serializer.fromFile(evaluatorConfigFile); - } catch (final IOException e) { - final String message = "Unable to parse the configuration file " + configPath; - throw fatal(message, e); + throw fatal( + "Configuration file " + configPath + " does not exist. Can be an issue in job submission.", + new FileNotFoundException(configPath)); + } + + if (!evaluatorConfigFile.canRead()) { + throw fatal( + "Configuration file " + configPath + " exists, but can't be read.", + new IOException(configPath)); + } + + try { + + final Configuration config = serializer.fromFile(evaluatorConfigFile); + + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "Configuration file {0} loaded:\n--\n{1}\n--", + new Object[] {configPath, new AvroConfigurationSerializer().toString(config, true)}); } + + return config; + + } catch (final IOException e) { + throw fatal("Unable to parse the configuration file: " + configPath, e); } } /** * Launches a REEF client process (Driver or Evaluator). - * - * @param args command-line args + * @param args Command-line arguments - + * must be a single element containing local path to the configuration file. */ @SuppressWarnings("checkstyle:illegalcatch") public static void main(final String[] args) { + LOG.log(Level.INFO, "Entering REEFLauncher.main()."); LOG.log(Level.FINE, "REEFLauncher started with user name [{0}]", System.getProperty("user.name")); - LOG.log(Level.FINE, "REEFLauncher started. Assertions are {0} in this process.", EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); @@ -168,12 +204,14 @@ public final class REEFLauncher { final REEFLauncher launcher = getREEFLauncher(args[0]); Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig)); - launcher.logVersion(); + launcher.reefVersion.logVersion(); // Write REEF version to the log. try (final Clock clock = launcher.getClockFromConfig()) { - LOG.log(Level.FINE, "Clock starting"); + + LOG.log(Level.FINE, "Clock: start"); clock.run(); - LOG.log(Level.FINE, "Clock exiting"); + LOG.log(Level.FINE, "Clock: exit normally"); + } catch (final Throwable ex) { try (final REEFErrorHandler errorHandler = launcher.getErrorHandlerFromConfig()) { throw fatal(errorHandler, "Unable to instantiate the clock", ex); @@ -183,38 +221,39 @@ public final class REEFLauncher { } LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); + if (LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after REEFLauncher.close():")); } + System.exit(0); + if (LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, ThreadLogger.getFormattedThreadList("Threads running after System.exit():")); } } - private void logVersion() { - this.reefVersion.logVersion(); - } - /** - * A new REEFErrorHandler is instantiated instead of lazy instantiation and saving the instantiated - * handler as a field since the ErrorHandler is closeable. - * @return A new REEFErrorHandler from clock config - * @throws InjectionException + * A new REEFErrorHandler is instantiated instead of lazy instantiation + * and saving the instantiated handler as a field since the ErrorHandler is closeable. + * @return A REEFErrorHandler object instantiated from clock config. + * @throws InjectionException configuration error. */ private REEFErrorHandler getErrorHandlerFromConfig() throws InjectionException { - return Tang.Factory.getTang().newInjector(this.clockConfig).getInstance(REEFErrorHandler.class); + return TANG.newInjector(this.clockConfig).getInstance(REEFErrorHandler.class); } /** * A new Clock is instantiated instead of lazy instantiation and saving the instantiated * handler as a field since the Clock is closeable. - * @return A new Clock from clock config - * @throws InjectionException + * @return A Clock object instantiated from the configuration. + * @throws InjectionException configuration error. */ private Clock getClockFromConfig() throws InjectionException { - final Injector clockInjector = Tang.Factory.getTang().newInjector(this.clockConfig); - if (this.isProfilingEnabled) { + + final Injector clockInjector = TANG.newInjector(this.clockConfig); + + if (this.isWakeProfilingEnabled) { final WakeProfiler profiler = new WakeProfiler(); ProfilingStopHandler.setProfiler(profiler); clockInjector.bindAspect(profiler); @@ -222,4 +261,31 @@ public final class REEFLauncher { return clockInjector.getInstance(Clock.class); } + + /** + * Wrap an exception into RuntimeException with a given message, + * and write the same message and exception to the log. + * @param msg an error message to log and pass into the RuntimeException. + * @param t A Throwable exception to log and wrap. + * @return a new Runtime exception wrapping a Throwable. + */ + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } + + /** + * Pass exception into an error handler, then wrap it into RuntimeException + * with a given message, and write the same message and exception to the log. + * @param errorHandler an error handler that consumes the exception before any further processing. + * @param msg an error message to log and pass into the RuntimeException. + * @param t A Throwable exception to log, wrap, and handle. + * @return a new Runtime exception wrapping a Throwable. + */ + private static RuntimeException fatal( + final REEFErrorHandler errorHandler, final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + errorHandler.onNext(t); + return new RuntimeException(msg, t); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/2bfb7318/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/formats/AvroConfigurationSerializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/formats/AvroConfigurationSerializer.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/formats/AvroConfigurationSerializer.java index 5d76050..e5dd027 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/formats/AvroConfigurationSerializer.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/formats/AvroConfigurationSerializer.java @@ -223,12 +223,28 @@ public final class AvroConfigurationSerializer implements ConfigurationSerialize return theBytes; } + /** + * Produce a JSON string that represents given configuration. + * @param configuration Tang configuration to convert into a JSON string. + * @return A JSON string that corresponds to the given Tang configuration. + */ @Override public String toString(final Configuration configuration) { + return toString(configuration, false); + } + + /** + * Produce a JSON string that represents given configuration. + * @param configuration Tang configuration to convert into a JSON string. + * @param prettyPrint If true, use new lines and spaces to pretty print the JSON string. + * If false (by default), output JSON as a single line. + * @return A JSON string that corresponds to the given Tang configuration. + */ + public String toString(final Configuration configuration, final boolean prettyPrint) { final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); final String result; try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { - final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); + final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out, prettyPrint); configurationWriter.write(toAvro(configuration), encoder); encoder.flush(); out.flush();
