Repository: reef
Updated Branches:
  refs/heads/master 67fe83587 -> d25412e9c


[REEF-1555] Refactor REEF Driver logic for code readability and add more 
javadocs

Summary of changes:
  * Cosmetic changes + extra javadocs and comments in DriverRuntimeStartHandler
  * add javadocs to HandlerContainer class description
  * refactor DriverStatusManager for readability.
  * Add javadocs and deprecate the sendJobEndingMessageToClient() method. No
    changes in the logic.
  * cosmetic changes in DriverRuntimeStopHandler for readability;
  * add public method DriverStatusManager.onRuntimeStop() to use in favor of
    deprecated sendJobEndingMessageToClient()
  * cosmetic changes for readability + javadocs in ResourceManagerStatus class
  * cosmetic changes for code readability in YarnContainerManager
  * add javadocs to YarnContainerManager
  * refactor local ContainerManager for readability + use more idiomatic java.
    No changes in functionality
  * more cosmetic changes to ContainerManager
  * cosmetic cleanups in RuntimesHost code. No changes in logic

JIRA:
  [REEF-1555](https://issues.apache.org/jira/browse/REEF-1555)

Pull Request:
  This closes #1111


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d25412e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d25412e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d25412e9

Branch: refs/heads/master
Commit: d25412e9cb17de688c0db9809dcc1ad314d8337d
Parents: 67fe835
Author: Sergiy Matusevych <[email protected]>
Authored: Tue Aug 30 19:10:28 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Sep 7 18:31:00 2016 -0700

----------------------------------------------------------------------
 .../driver/DriverRuntimeStartHandler.java       |  47 ++-
 .../common/driver/DriverRuntimeStopHandler.java |  27 +-
 .../common/driver/DriverStatusManager.java      | 161 ++++++----
 .../common/driver/idle/DriverIdleManager.java   |  63 ++--
 .../resourcemanager/ResourceManagerStatus.java  | 106 +++----
 .../runtime/local/driver/ContainerManager.java  | 188 +++++------
 .../reef/runtime/multi/driver/RuntimesHost.java | 129 ++++----
 .../yarn/driver/YarnContainerManager.java       | 314 ++++++++++++-------
 8 files changed, 587 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
index 612283f..a3ef519 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
@@ -35,11 +35,14 @@ import java.util.logging.Logger;
 /**
  * The RuntimeStart handler of the Driver.
  * <p>
- * This instantiates the DriverSingletons upon construction. Upon onNext(), it 
sets the resource manager status and
- * wires up the remote event handler connections to the client and the 
evaluators.
+ * This instantiates the DriverSingletons upon construction. Upon onNext(),
+ * it sets the resource manager status and wires up the remote event handler
+ * connections to the client and the evaluators.
  */
 final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
   private static final Logger LOG = 
Logger.getLogger(DriverRuntimeStartHandler.class.getName());
+
   private final RemoteManager remoteManager;
   private final EvaluatorResourceManagerErrorHandler 
evaluatorResourceManagerErrorHandler;
   private final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler;
@@ -57,13 +60,15 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
    * @param driverStatusManager                  will be set to RUNNING in 
onNext()
    */
   @Inject
-  DriverRuntimeStartHandler(final DriverSingletons singletons,
-                            final RemoteManager remoteManager,
-                            final EvaluatorResourceManagerErrorHandler 
evaluatorResourceManagerErrorHandler,
-                            final EvaluatorHeartbeatHandler 
evaluatorHeartbeatHandler,
-                            final ResourceManagerStatus resourceManagerStatus,
-                            final ResourceManagerStartHandler 
resourceManagerStartHandler,
-                            final DriverStatusManager driverStatusManager) {
+  private DriverRuntimeStartHandler(
+      final DriverSingletons singletons,
+      final RemoteManager remoteManager,
+      final EvaluatorResourceManagerErrorHandler 
evaluatorResourceManagerErrorHandler,
+      final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler,
+      final ResourceManagerStatus resourceManagerStatus,
+      final ResourceManagerStartHandler resourceManagerStartHandler,
+      final DriverStatusManager driverStatusManager) {
+
     this.remoteManager = remoteManager;
     this.evaluatorResourceManagerErrorHandler = 
evaluatorResourceManagerErrorHandler;
     this.evaluatorHeartbeatHandler = evaluatorHeartbeatHandler;
@@ -72,16 +77,34 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
     this.driverStatusManager = driverStatusManager;
   }
 
+  /**
+   * This method is called on start of the REEF Driver runtime event loop.
+   * It contains startup logic for REEF Driver that is independent from a
+   * runtime framework (e.g. Mesos, YARN, Local, etc).
+   * Platform-specific logic is then handled in ResourceManagerStartHandler.
+   * @param runtimeStart An event that signals start of the Driver runtime.
+   * Contains a timestamp and can be pretty printed.
+   */
   @Override
   public synchronized void onNext(final RuntimeStart runtimeStart) {
+
     LOG.log(Level.FINEST, "RuntimeStart: {0}", runtimeStart);
 
-    
this.remoteManager.registerHandler(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class,
-        evaluatorHeartbeatHandler);
-    
this.remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, 
evaluatorResourceManagerErrorHandler);
+    // Register for heartbeats and error messages from the Evaluators.
+    this.remoteManager.registerHandler(
+        EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class,
+        this.evaluatorHeartbeatHandler);
+
+    this.remoteManager.registerHandler(
+        ReefServiceProtos.RuntimeErrorProto.class,
+        this.evaluatorResourceManagerErrorHandler);
+
     this.resourceManagerStatus.setRunning();
     this.driverStatusManager.onRunning();
+
+    // Forward start event to the runtime-specific handler (e.g. YARN, Local, 
etc.)
     this.resourceManagerStartHandler.onNext(runtimeStart);
+
     LOG.log(Level.FINEST, "DriverRuntimeStartHandler complete.");
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
index e57ba0a..678a596 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -41,6 +41,7 @@ import java.util.logging.Logger;
 @Private
 @DriverSide
 final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
   private static final Logger LOG = 
Logger.getLogger(DriverRuntimeStopHandler.class.getName());
 
   private final DriverStatusManager driverStatusManager;
@@ -50,12 +51,13 @@ final class DriverRuntimeStopHandler implements 
EventHandler<RuntimeStop> {
   private final boolean preserveEvaluatorsAcrossRestarts;
 
   @Inject
-  DriverRuntimeStopHandler(final DriverStatusManager driverStatusManager,
-                           final ResourceManagerStopHandler 
resourceManagerStopHandler,
-                           final RemoteManager remoteManager,
-                           final Evaluators evaluators,
-                           @Parameter(ResourceManagerPreserveEvaluators.class)
-                           final boolean preserveEvaluatorsAcrossRestarts) {
+  DriverRuntimeStopHandler(
+      final DriverStatusManager driverStatusManager,
+      final ResourceManagerStopHandler resourceManagerStopHandler,
+      final RemoteManager remoteManager,
+      final Evaluators evaluators,
+      @Parameter(ResourceManagerPreserveEvaluators.class) final boolean 
preserveEvaluatorsAcrossRestarts) {
+
     this.driverStatusManager = driverStatusManager;
     this.resourceManagerStopHandler = resourceManagerStopHandler;
     this.remoteManager = remoteManager;
@@ -65,25 +67,30 @@ final class DriverRuntimeStopHandler implements 
EventHandler<RuntimeStop> {
 
   @Override
   public synchronized void onNext(final RuntimeStop runtimeStop) {
+
     LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
 
+    final Throwable runtimeException = runtimeStop.getException();
+
     // Shut down evaluators if there are no exceptions, the driver is 
forcefully
     // shut down by a non-recoverable exception, or restart is not enabled.
-    if (runtimeStop.getException() == null ||
-        runtimeStop.getException() instanceof DriverFatalRuntimeException ||
+    if (runtimeException == null ||
+        runtimeException instanceof DriverFatalRuntimeException ||
         !this.preserveEvaluatorsAcrossRestarts) {
       this.evaluators.close();
     }
 
     this.resourceManagerStopHandler.onNext(runtimeStop);
+
     // Inform the client of the shutdown.
-    final Optional<Throwable> exception = 
Optional.<Throwable>ofNullable(runtimeStop.getException());
-    this.driverStatusManager.sendJobEndingMessageToClient(exception);
+    
this.driverStatusManager.onRuntimeStop(Optional.ofNullable(runtimeException));
+
     // Close the remoteManager.
     try {
       this.remoteManager.close();
       LOG.log(Level.INFO, "Driver shutdown complete");
     } catch (final Exception e) {
+      LOG.log(Level.WARNING, "Error when closing the RemoteManager", e);
       throw new RuntimeException("Unable to close the RemoteManager.", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 86a4429..129e702 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -33,44 +33,53 @@ import java.util.logging.Logger;
 
 /**
  * Manages the Driver's status.
+ * Communicates status changes to the client and shuts down the runtime clock 
on shutdown.
  */
 public final class DriverStatusManager {
+
   private static final Logger LOG = 
Logger.getLogger(DriverStatusManager.class.getName());
+  private static final String CLASS_NAME = 
DriverStatusManager.class.getCanonicalName();
+
   private final Clock clock;
   private final ClientConnection clientConnection;
   private final String jobIdentifier;
   private final ExceptionCodec exceptionCodec;
+
   private DriverStatus driverStatus = DriverStatus.PRE_INIT;
   private Optional<Throwable> shutdownCause = Optional.empty();
   private boolean driverTerminationHasBeenCommunicatedToClient = false;
 
-
   /**
-   * @param clock
-   * @param clientConnection
-   * @param jobIdentifier
-   * @param exceptionCodec
+   * Build a new status manager. This is done automatically by Tang.
+   * @param clock runtime event loop to shut down on completion or error.
+   * @param clientConnection Connection to the job client. Send init, running, 
and job ending messages.
+   * @param jobIdentifier String job ID.
+   * @param exceptionCodec codec to serialize the exception when sending job 
ending message to the client.
    */
   @Inject
-  DriverStatusManager(final Clock clock,
-                      final ClientConnection clientConnection,
-                      @Parameter(JobIdentifier.class) final String 
jobIdentifier,
-                      final ExceptionCodec exceptionCodec) {
-    LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
+  private DriverStatusManager(
+      @Parameter(JobIdentifier.class) final String jobIdentifier,
+      final Clock clock,
+      final ClientConnection clientConnection,
+      final ExceptionCodec exceptionCodec) {
+
+    LOG.entering(CLASS_NAME, "<init>");
+
     this.clock = clock;
     this.clientConnection = clientConnection;
     this.jobIdentifier = jobIdentifier;
     this.exceptionCodec = exceptionCodec;
+
     LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
-    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
+
+    LOG.exiting(CLASS_NAME, "<init>");
   }
 
   /**
    * Check whether a state transition 'from->to' is legal.
-   *
-   * @param from
-   * @param to
-   * @return
+   * @param from Source state.
+   * @param to Destination state.
+   * @return true if transition is valid, false otherwise.
    */
   private static boolean isLegalTransition(final DriverStatus from, final 
DriverStatus to) {
     switch (from) {
@@ -108,10 +117,13 @@ public final class DriverStatusManager {
    * Changes the driver status to INIT and sends message to the client about 
the transition.
    */
   public synchronized void onInit() {
-    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
+
+    LOG.entering(CLASS_NAME, "onInit");
+
     this.clientConnection.send(this.getInitMessage());
     this.setStatus(DriverStatus.INIT);
-    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
+
+    LOG.exiting(CLASS_NAME, "onInit");
   }
 
   /**
@@ -119,22 +131,27 @@ public final class DriverStatusManager {
    * If the driver is in status 'PRE_INIT', this first calls onInit();
    */
   public synchronized void onRunning() {
-    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
+
+    LOG.entering(CLASS_NAME, "onRunning");
+
     if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
       this.onInit();
     }
+
     this.clientConnection.send(this.getRunningMessage());
     this.setStatus(DriverStatus.RUNNING);
-    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
+
+    LOG.exiting(CLASS_NAME, "onRunning");
   }
 
   /**
    * End the Driver with an exception.
-   *
-   * @param exception
+   * @param exception Exception that causes the driver shutdown.
    */
   public synchronized void onError(final Throwable exception) {
-    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new 
Object[]{exception});
+
+    LOG.entering(CLASS_NAME, "onError", exception);
+
     if (this.isShuttingDownOrFailing()) {
       LOG.log(Level.WARNING, "Received an exception while already in 
shutdown.", exception);
     } else {
@@ -143,63 +160,78 @@ public final class DriverStatusManager {
       this.clock.stop(exception);
       this.setStatus(DriverStatus.FAILING);
     }
-    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new 
Object[]{exception});
+
+    LOG.exiting(CLASS_NAME, "onError", exception);
   }
 
   /**
    * Perform a clean shutdown of the Driver.
    */
-  @SuppressWarnings("checkstyle:constructorwithoutparams") // Exception() here 
captures the callstack
   public synchronized void onComplete() {
-    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
+
+    LOG.entering(CLASS_NAME, "onComplete");
+
     if (this.isShuttingDownOrFailing()) {
-      LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
+      LOG.log(Level.WARNING, "Ignoring second call to onComplete()",
+          new Exception("Dummy exception to get the call stack"));
     } else {
       LOG.log(Level.INFO, "Clean shutdown of the Driver.");
       if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "Callstack: ", new Exception());
+        LOG.log(Level.FINEST, "Call stack: ",
+            new Exception("Dummy exception to get the call stack"));
       }
       this.clock.close();
       this.setStatus(DriverStatus.SHUTTING_DOWN);
     }
-    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
 
+    LOG.exiting(CLASS_NAME, "onComplete");
+  }
+
+  /**
+   * Sends the final message to the client. This is used by 
DriverRuntimeStopHandler.onNext().
+   * @param exception Exception that caused the job to end (optional).
+   */
+  public synchronized void onRuntimeStop(final Optional<Throwable> exception) {
+    this.sendJobEndingMessageToClient(exception);
   }
 
   /**
    * Sends the final message to the Driver. This is used by 
DriverRuntimeStopHandler.onNext().
-   *
-   * @param exception
+   * @param exception Exception that caused the job to end (can be absent).
+   * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a 
proxy to the job client.
+   * After release 0.16, make this method private and use it inside 
onRuntimeStop() method instead.
    */
   public synchronized void sendJobEndingMessageToClient(final 
Optional<Throwable> exception) {
-    if (this.isNotShuttingDownOrFailing()) {
+
+    if (!this.isShuttingDownOrFailing()) {
       LOG.log(Level.SEVERE, "Sending message in a state different that 
SHUTTING_DOWN or FAILING. " +
-          "This is likely a illegal call to clock.close() at play. Current 
state: " + this.driverStatus);
+          "This is likely a illegal call to clock.close() at play. Current 
state: {0}", this.driverStatus);
     }
+
     if (this.driverTerminationHasBeenCommunicatedToClient) {
       LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. 
Ignoring the second call");
-    } else {
-      // Log the shutdown situation
-      if (this.shutdownCause.isPresent()) {
-        LOG.log(Level.WARNING, "Sending message about an unclean driver 
shutdown.", this.shutdownCause.get());
-      }
-      if (exception.isPresent()) {
-        LOG.log(Level.WARNING, "There was an exception during clock.close().", 
exception.get());
-      }
-      if (this.shutdownCause.isPresent() && exception.isPresent()) {
-        LOG.log(Level.WARNING, "The driver is shutdown because of an exception 
(see above) and there was " +
-            "an exception during clock.close(). Only the first exception will 
be sent to the client");
-      }
+      return;
+    }
 
-      if (this.shutdownCause.isPresent()) {
-        // Send the earlier exception, if there was one
-        this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
-      } else {
-        // Send the exception passed, if there was one.
-        this.clientConnection.send(getJobEndingMessage(exception));
-      }
-      this.driverTerminationHasBeenCommunicatedToClient = true;
+    // Log the shutdown situation
+    if (this.shutdownCause.isPresent()) {
+      LOG.log(Level.WARNING, "Sending message about an unclean driver 
shutdown.", this.shutdownCause.get());
+    }
+
+    if (exception.isPresent()) {
+      LOG.log(Level.WARNING, "There was an exception during clock.close().", 
exception.get());
+    }
+
+    if (this.shutdownCause.isPresent() && exception.isPresent()) {
+      LOG.log(Level.WARNING, "The driver is shutdown because of an exception 
(see above) and there was " +
+          "an exception during clock.close(). Only the first exception will be 
sent to the client");
     }
+
+    // Send the earlier exception, if there was one. Otherwise, send the 
exception passed.
+    this.clientConnection.send(getJobEndingMessage(
+        this.shutdownCause.isPresent() ? this.shutdownCause : exception));
+
+    this.driverTerminationHasBeenCommunicatedToClient = true;
   }
 
   public synchronized boolean isShuttingDownOrFailing() {
@@ -207,21 +239,16 @@ public final class DriverStatusManager {
         || DriverStatus.FAILING.equals(this.driverStatus);
   }
 
-  private synchronized boolean isNotShuttingDownOrFailing() {
-    return !isShuttingDownOrFailing();
-  }
-
   /**
-   * Helper method to set the status. This also checks whether the transition 
from the current status to the new one is
-   * legal.
-   *
-   * @param newStatus
+   * Helper method to set the status.
+   * This also checks whether the transition from the current status to the 
new one is legal.
+   * @param newStatus Driver status to transition to.
    */
   private synchronized void setStatus(final DriverStatus newStatus) {
     if (isLegalTransition(this.driverStatus, newStatus)) {
       this.driverStatus = newStatus;
     } else {
-      LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus 
+ "'->'" + newStatus + "'");
+      LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new 
Object[] {this.driverStatus, newStatus});
     }
   }
 
@@ -229,27 +256,25 @@ public final class DriverStatusManager {
    * @param exception the exception that ended the Driver, if any.
    * @return message to be sent to the client at the end of the job.
    */
-  private synchronized ReefServiceProtos.JobStatusProto 
getJobEndingMessage(final Optional<Throwable> exception) {
-    final ReefServiceProtos.JobStatusProto message;
+  private ReefServiceProtos.JobStatusProto getJobEndingMessage(final 
Optional<Throwable> exception) {
     if (exception.isPresent()) {
-      message = ReefServiceProtos.JobStatusProto.newBuilder()
+      return ReefServiceProtos.JobStatusProto.newBuilder()
           .setIdentifier(this.jobIdentifier)
           .setState(ReefServiceProtos.State.FAILED)
           
.setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
           .build();
     } else {
-      message = ReefServiceProtos.JobStatusProto.newBuilder()
+      return ReefServiceProtos.JobStatusProto.newBuilder()
           .setIdentifier(this.jobIdentifier)
           .setState(ReefServiceProtos.State.DONE)
           .build();
     }
-    return message;
   }
 
   /**
    * @return The message to be sent through the ClientConnection when in state 
INIT.
    */
-  private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
+  private ReefServiceProtos.JobStatusProto getInitMessage() {
     return ReefServiceProtos.JobStatusProto.newBuilder()
         .setIdentifier(this.jobIdentifier)
         .setState(ReefServiceProtos.State.INIT)
@@ -259,7 +284,7 @@ public final class DriverStatusManager {
   /**
    * @return The message to be sent through the ClientConnection when in state 
RUNNING.
    */
-  private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
+  private ReefServiceProtos.JobStatusProto getRunningMessage() {
     return ReefServiceProtos.JobStatusProto.newBuilder()
         .setIdentifier(this.jobIdentifier)
         .setState(ReefServiceProtos.State.RUNNING)

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
index cdc71dd..285a1b1 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -32,43 +32,54 @@ import java.util.logging.Logger;
  * Handles the various sources for driver idleness and forwards decisions to 
DriverStatusManager.
  */
 public final class DriverIdleManager {
+
   private static final Logger LOG = 
Logger.getLogger(DriverIdleManager.class.getName());
   private static final Level IDLE_REASONS_LEVEL = Level.FINEST;
+
   private final Set<DriverIdlenessSource> idlenessSources;
   private final InjectionFuture<DriverStatusManager> driverStatusManager;
 
   @Inject
-  DriverIdleManager(@Parameter(DriverIdleSources.class) final 
Set<DriverIdlenessSource> idlenessSources,
-                    final InjectionFuture<DriverStatusManager> 
driverStatusManager) {
+  private DriverIdleManager(
+      @Parameter(DriverIdleSources.class) final Set<DriverIdlenessSource> 
idlenessSources,
+      final InjectionFuture<DriverStatusManager> driverStatusManager) {
+
     this.idlenessSources = idlenessSources;
     this.driverStatusManager = driverStatusManager;
   }
 
+  /**
+   * Check whether all Driver components are idle, and initiate driver 
shutdown if they are.
+   * @param reason An indication whether the component is idle, along with a 
descriptive message.
+   */
   public synchronized void onPotentiallyIdle(final IdleMessage reason) {
-    synchronized (driverStatusManager.get()) {
-      if (this.driverStatusManager.get().isShuttingDownOrFailing()) {
-        LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason 
[{1}]",
-            new Object[]{reason.getComponentName(), reason.getReason()});
-      } else {
-        boolean isIdle = true;
-        LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported 
idleness for reason [{1}]",
-            new Object[]{reason.getComponentName(), reason.getReason()});
-
-
-        for (final DriverIdlenessSource idlenessSource : this.idlenessSources) 
{
-          final IdleMessage idleMessage = idlenessSource.getIdleStatus();
-          LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}].",
-              new Object[]{idleMessage.getComponentName(), 
idleMessage.isIdle() ? "idle" : "not idle",
-                  idleMessage.getReason()}
-          );
-          isIdle &= idleMessage.isIdle();
-        }
-
-        if (isIdle) {
-          LOG.log(Level.INFO, "All components indicated idle. Initiating 
Driver shutdown.");
-          this.driverStatusManager.get().onComplete();
-        }
-      }
+
+    final DriverStatusManager driverStatusManagerImpl = 
this.driverStatusManager.get();
+
+    if (driverStatusManagerImpl.isShuttingDownOrFailing()) {
+      LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason 
[{1}]",
+          new Object[] {reason.getComponentName(), reason.getReason()});
+      return;
+    }
+
+    LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported 
idleness for reason [{1}]",
+        new Object[] {reason.getComponentName(), reason.getReason()});
+
+    boolean isIdle = true;
+    for (final DriverIdlenessSource idlenessSource : this.idlenessSources) {
+
+      final IdleMessage idleMessage = idlenessSource.getIdleStatus();
+
+      LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}].",
+          new Object[] {idleMessage.getComponentName(),
+              idleMessage.isIdle() ? "idle" : "not idle", 
idleMessage.getReason()});
+
+      isIdle &= idleMessage.isIdle();
+    }
+
+    if (isIdle) {
+      LOG.log(Level.INFO, "All components indicated idle. Initiating Driver 
shutdown.");
+      driverStatusManagerImpl.onComplete();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index 273649f..f9a526d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -33,15 +33,16 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
- * Manages the status of the Resource Manager.
+ * Manages the status of the Resource Manager and tracks whether it is idle.
  */
 @DriverSide
 @Private
-public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEvent>,
-    DriverIdlenessSource {
+public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEvent>, DriverIdlenessSource {
+
   private static final Logger LOG = 
Logger.getLogger(ResourceManagerStatus.class.getName());
 
   private static final String COMPONENT_NAME = "ResourceManager";
+
   private static final IdleMessage IDLE_MESSAGE =
       new IdleMessage(COMPONENT_NAME, "No outstanding requests or 
allocations", true);
 
@@ -49,15 +50,21 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   private final DriverStatusManager driverStatusManager;
   private final InjectionFuture<DriverIdleManager> driverIdleManager;
 
-  // Mutable state.
-  private State state =  State.INIT;
+  /** Mutable RM state. */
+  private State state = State.INIT;
+
+  /** Number of container requests outstanding with the RM, as per latest 
RuntimeStatusEvent message. */
   private int outstandingContainerRequests = 0;
+
+  /** Number of containers currently allocated, as per latest 
RuntimeStatusEvent message. */
   private int containerAllocationCount = 0;
 
   @Inject
-  ResourceManagerStatus(final ResourceManagerErrorHandler 
resourceManagerErrorHandler,
-                        final DriverStatusManager driverStatusManager,
-                        final InjectionFuture<DriverIdleManager> 
driverIdleManager) {
+  private ResourceManagerStatus(
+      final ResourceManagerErrorHandler resourceManagerErrorHandler,
+      final DriverStatusManager driverStatusManager,
+      final InjectionFuture<DriverIdleManager> driverIdleManager) {
+
     this.resourceManagerErrorHandler = resourceManagerErrorHandler;
     this.driverStatusManager = driverStatusManager;
     this.driverIdleManager = driverIdleManager;
@@ -65,11 +72,15 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
 
   @Override
   public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) 
{
+
     final State newState = runtimeStatusEvent.getState();
-    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
+
+    LOG.log(Level.FINEST, "Runtime status: {0}", runtimeStatusEvent);
+
     this.outstandingContainerRequests = 
runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
     this.containerAllocationCount = 
runtimeStatusEvent.getContainerAllocationList().size();
-    this.setState(runtimeStatusEvent.getState());
+
+    this.setState(newState);
 
     switch (newState) {
     case FAILED:
@@ -98,23 +109,32 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   }
 
   /**
-   * @return idle, if there are no outstanding requests or allocations. Not 
idle else.
+   * Driver is idle if, regardless of status, it has no evaluators allocated 
and no pending container requests.
+   * @return true if the driver can be considered idle, false otherwise.
+   */
+  private synchronized boolean isIdle() {
+    return this.outstandingContainerRequests == 0 && 
this.containerAllocationCount == 0;
+  }
+
+  /**
+   * Driver is idle if, regardless of status, it has no evaluators allocated
+   * and no pending container requests. This method is used in the 
DriverIdleManager.
+   * If all DriverIdlenessSource components are idle, DriverIdleManager will 
initiate Driver shutdown.
+   * @return idle, if there are no outstanding requests or allocations. Not 
idle otherwise.
    */
   @Override
   public synchronized IdleMessage getIdleStatus() {
+
     if (this.isIdle()) {
       return IDLE_MESSAGE;
-    } else {
-      final String message = new StringBuilder("There are ")
-          .append(this.outstandingContainerRequests)
-          .append(" outstanding container requests and ")
-          .append(this.containerAllocationCount)
-          .append(" allocated containers")
-          .toString();
-      return new IdleMessage(COMPONENT_NAME, message, false);
     }
-  }
 
+    final String message = String.format(
+        "There are %d outstanding container requests and %d allocated 
containers",
+        this.outstandingContainerRequests, this.containerAllocationCount);
+
+    return new IdleMessage(COMPONENT_NAME, message, false);
+  }
 
   private synchronized void onRMFailure(final RuntimeStatusEvent 
runtimeStatusEvent) {
     assert runtimeStatusEvent.getState() == State.FAILED;
@@ -134,33 +154,18 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
     }
   }
 
-
-  private synchronized boolean isIdle() {
-    return this.hasNoOutstandingRequests()
-        && this.hasNoContainersAllocated();
-  }
-
-  private synchronized boolean isRunning() {
-    return State.RUNNING.equals(this.state);
-  }
-
   /**
-  *
   * Checks if the ResourceManager can switch from the current state to the 
target state.
   * See REEF-826 for the state transition matrix.
-  *
-  * @param from current state
-  * @param to   state to switch to
-  *
-  * @return true if the transition is legal; false otherwise
-  *
+  * @param from current state.
+  * @param to state to switch to.
+  * @return true if the transition is legal; false otherwise.
   */
-  private synchronized boolean isLegalStateTransition(final State from,
-                                                      final State to) {
+  private static boolean isLegalStateTransition(final State from, final State 
to) {
 
     // handle diagonal elements of the transition matrix
-    if (from.equals(to)){
-      LOG.finest("Transition from " + from + " state to the same state.");
+    if (from.equals(to)) {
+      LOG.log(Level.FINEST, "Transition from {0} state to the same state.", 
from);
       return true;
     }
 
@@ -207,30 +212,15 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
 
     default:
       return false;
-
     }
-
   }
 
   private synchronized void setState(final State newState) {
-
     if (isLegalStateTransition(this.state, newState)) {
       this.state = newState;
     } else {
-      throw new IllegalStateException("Resource manager attempts illegal state 
transition from "
-                + this.state + " to "
-                + newState);
+      throw new IllegalStateException(
+          "Resource manager attempts illegal state transition from " + 
this.state + " to " + newState);
     }
-
-  }
-
-
-  private synchronized boolean hasNoOutstandingRequests() {
-    return this.outstandingContainerRequests == 0;
-  }
-
-  private synchronized boolean hasNoContainersAllocated() {
-    return this.containerAllocationCount == 0;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index daafece..3184fc5 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -45,14 +45,7 @@ import org.apache.reef.wake.remote.RemoteMessage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -67,8 +60,7 @@ final class ContainerManager implements AutoCloseable {
 
   private static final Logger LOG = 
Logger.getLogger(ContainerManager.class.getName());
 
-  private static final Collection<String> DEFAULT_RACKS = 
Arrays.asList(RackNames.DEFAULT_RACK_NAME);
-
+  private static final Collection<String> DEFAULT_RACKS = 
Collections.singletonList(RackNames.DEFAULT_RACK_NAME);
 
   /**
    * Map from containerID -> Container.
@@ -104,7 +96,7 @@ final class ContainerManager implements AutoCloseable {
   private final Collection<String> availableRacks;
 
   @Inject
-  ContainerManager(
+  private ContainerManager(
       final RemoteManager remoteManager,
       final REEFFileNames fileNames,
       @Parameter(MaxNumberOfEvaluators.class) final int capacity,
@@ -116,6 +108,7 @@ final class ContainerManager implements AutoCloseable {
       final LocalAddressProvider localAddressProvider,
       @Parameter(DefaultMemorySize.class) final int defaultMemorySize,
       @Parameter(DefaultNumberOfCores.class) final int defaultNumberOfCores) {
+
     this.capacity = capacity;
     this.defaultMemorySize = defaultMemorySize;
     this.defaultNumberOfCores = defaultNumberOfCores;
@@ -144,55 +137,75 @@ final class ContainerManager implements AutoCloseable {
     LOG.log(Level.FINE, "Initialized Container Manager with {0} containers", 
capacity);
   }
 
-  private Collection<String> normalize(final Collection<String> rackNames) {
+  /**
+   * Normalize rack names. Make sure that each rack name starts with a path 
separator /
+   * and does not have a path separator at the end. Also check that no rack 
names
+   * end with a wildcard *, and raise an exception if such rack name occurs in 
the input.
+   * @param rackNames Collection of rack names to normalize.
+   * @return Collection of normalized rack names.
+   * @throws IllegalArgumentException if validation of some rack names' fails.
+   */
+  private static Collection<String> normalize(
+      final Collection<String> rackNames) throws IllegalArgumentException {
+
     return normalize(rackNames, true);
   }
 
   /**
-   * Normalizes the rack names.
-   *
-   * @param rackNames
-   *          the rack names to normalize
-   * @param validateEnd
-   *          if true, throws an exception if the name ends with ANY (*)
-   * @return a normalized collection
+   * Normalize rack names. Make sure that each rack name starts with a path 
separator /
+   * and does not have a path separator at the end. Also, if end validation is 
on, check
+   * that rack name does not end with a wildcard *.
+   * @param rackNames Collection of rack names to normalize.
+   * @param validateEnd If true, throw an exception if the name ends with ANY 
(*)
+   * @return Collection of normalized rack names.
+   * @throws IllegalArgumentException if validation of some rack names' fails.
    */
-  private Collection<String> normalize(final Collection<String> rackNames,
-      final boolean validateEnd) {
+  private static Collection<String> normalize(
+      final Collection<String> rackNames, final boolean validateEnd) throws 
IllegalArgumentException {
+
     final List<String> normalizedRackNames = new ArrayList<>(rackNames.size());
-    final Iterator<String> it = rackNames.iterator();
-    while (it.hasNext()) {
-      String rackName = it.next().trim();
+
+    for (String rackName : rackNames) {
+
+      rackName = rackName.trim();
       Validate.notEmpty(rackName, "Rack names cannot be empty");
+
       // should start with a separator
       if (!rackName.startsWith(Constants.RACK_PATH_SEPARATOR)) {
         rackName = Constants.RACK_PATH_SEPARATOR + rackName;
       }
+
       // remove the ending separator
       if (rackName.endsWith(Constants.RACK_PATH_SEPARATOR)) {
         rackName = rackName.substring(0, rackName.length() - 1);
       }
+
       if (validateEnd) {
         Validate.isTrue(!rackName.endsWith(Constants.ANY_RACK));
       }
+
       normalizedRackNames.add(rackName);
     }
+
     return normalizedRackNames;
   }
 
   private void init() {
+
     // evenly distribute the containers among the racks
     // if rack names are not specified, the default rack will be used, so the 
denominator will always be > 0
-    final int capacityPerRack = capacity / availableRacks.size();
-    int missing = capacity % availableRacks.size();
+    final int capacityPerRack = this.capacity / this.availableRacks.size();
+    int missing = this.capacity % this.availableRacks.size();
+
     // initialize the freeNodesPerRackList and the capacityPerRack
-    for (final String rackName : availableRacks) {
-      this.freeNodesPerRack.put(rackName, new HashMap<String, Boolean>());
-      this.capacitiesPerRack.put(rackName, capacityPerRack);
+    for (final String rackName : this.availableRacks) {
+      int currentCapacity = capacityPerRack;
       if (missing > 0) {
-        this.capacitiesPerRack.put(rackName, 
this.capacitiesPerRack.get(rackName) + 1);
-        missing--;
+        ++currentCapacity;
+        --missing;
       }
+      this.capacitiesPerRack.put(rackName, currentCapacity);
+      this.freeNodesPerRack.put(rackName, new HashMap<String, Boolean>());
     }
   }
 
@@ -226,29 +239,25 @@ final class ContainerManager implements AutoCloseable {
   }
 
   private Collection<String> getRackNamesOrDefault(final List<String> 
rackNames) {
-    return CollectionUtils.isNotEmpty(rackNames) ? normalize(rackNames, false)
-        : DEFAULT_RACKS;
+    return CollectionUtils.isNotEmpty(rackNames) ? normalize(rackNames, false) 
: DEFAULT_RACKS;
   }
 
-
   /**
-   * Returns the node name of the container to be allocated if it's available, 
selected from the list of preferred
-   * node names. If the list is empty, then an empty optional is returned
-   *
-   * @param nodeNames
-   *          the list of preferred nodes
-   * @return the node name where to allocate the container
+   * Returns the node name of the container to be allocated if it's available,
+   * selected from the list of preferred node names.
+   * If the list is empty, then an empty optional is returned.
+   * @param nodeNames the list of preferred nodes.
+   * @return the node name where to allocate the container.
    */
   private Optional<String> getPreferredNode(final List<String> nodeNames) {
-    if (CollectionUtils.isNotEmpty(nodeNames)) {
-      for (final String nodeName : nodeNames) {
-        final String possibleRack = racksPerNode.get(nodeName);
-        if (possibleRack != null
-            && freeNodesPerRack.get(possibleRack).containsKey(nodeName)) {
-          return Optional.of(nodeName);
-        }
+
+    for (final String nodeName : nodeNames) {
+      final String possibleRack = this.racksPerNode.get(nodeName);
+      if (possibleRack != null && 
this.freeNodesPerRack.get(possibleRack).containsKey(nodeName)) {
+        return Optional.of(nodeName);
       }
     }
+
     return Optional.empty();
   }
 
@@ -257,60 +266,57 @@ final class ContainerManager implements AutoCloseable {
    * preferred rack names. If the list is empty, and there's space in the 
default
    * rack, then the default rack is returned. The relax locality semantic is
    * enabled if the list of rack names contains '/*', otherwise relax locality
-   * is considered disabled
+   * is considered disabled.
    *
-   * @param rackNames
-   *          the list of preferred racks
-   * @return the rack name where to allocate the container
+   * @param rackNames the list of preferred racks.
+   * @return the rack name where to allocate the container.
    */
   private Optional<String> getPreferredRack(final List<String> rackNames) {
-    final Collection<String> normalized = getRackNamesOrDefault(rackNames);
-    for (final String rackName : normalized) {
-      // if it does not end with the any modifier,
-      // then we should do an exact match
+
+    for (final String rackName : getRackNamesOrDefault(rackNames)) {
+
+      // if it does not end with the any modifier, then we should do an exact 
match
       if (!rackName.endsWith(Constants.ANY_RACK)) {
-        if (freeNodesPerRack.containsKey(rackName)
-            && freeNodesPerRack.get(rackName).size() > 0) {
+        if (freeNodesPerRack.containsKey(rackName) && 
freeNodesPerRack.get(rackName).size() > 0) {
           return Optional.of(rackName);
         }
       } else {
+
         // if ends with the any modifier, we do a prefix match
-        final Iterator<String> it = availableRacks.iterator();
-        while (it.hasNext()) {
-          final String possibleRackName = it.next();
+        for (final String possibleRackName : this.availableRacks) {
+
           // remove the any modifier
-          final String newRackName = rackName.substring(0,
-              rackName.length() - 1);
+          final String newRackName = rackName.substring(0, rackName.length() - 
1);
+
           if (possibleRackName.startsWith(newRackName) &&
-              freeNodesPerRack.get(possibleRackName).size() > 0) {
+              this.freeNodesPerRack.get(possibleRackName).size() > 0) {
             return Optional.of(possibleRackName);
           }
         }
       }
     }
+
     return Optional.empty();
   }
 
   /**
    * Allocates a container based on a request event. First it tries to match a
-   * given node, if it cannot, it tries to get a spot in a rack
-   *
-   * @param requestEvent
-   *          the request event
-   * @return an optional with the container if allocated
+   * given node, if it cannot, it tries to get a spot in a rack.
+   * @param requestEvent resource request event.
+   * @return an optional with the container if allocated.
    */
   Optional<Container> allocateContainer(final ResourceRequestEvent 
requestEvent) {
+
     Container container = null;
-    final Optional<String> nodeName = getPreferredNode(requestEvent
-        .getNodeNameList());
+    final Optional<String> nodeName = 
getPreferredNode(requestEvent.getNodeNameList());
+
     if (nodeName.isPresent()) {
       container = allocateBasedOnNode(
           requestEvent.getMemorySize().orElse(this.defaultMemorySize),
           requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores),
           nodeName.get());
     } else {
-      final Optional<String> rackName = getPreferredRack(requestEvent
-          .getRackNameList());
+      final Optional<String> rackName = 
getPreferredRack(requestEvent.getRackNameList());
       if (rackName.isPresent()) {
         container = allocateBasedOnRack(
             requestEvent.getMemorySize().orElse(this.defaultMemorySize),
@@ -318,11 +324,11 @@ final class ContainerManager implements AutoCloseable {
             rackName.get());
       }
     }
+
     return Optional.ofNullable(container);
   }
 
-  private Container allocateBasedOnNode(final int megaBytes,
-      final int numberOfCores, final String nodeId) {
+  private Container allocateBasedOnNode(final int megaBytes, final int 
numberOfCores, final String nodeId) {
     synchronized (this.containers) {
       // get the rack name
       final String rackName = this.racksPerNode.get(nodeId);
@@ -333,38 +339,41 @@ final class ContainerManager implements AutoCloseable {
     }
   }
 
-  private Container allocateBasedOnRack(final int megaBytes,
-      final int numberOfCores, final String rackName) {
+  private Container allocateBasedOnRack(final int megaBytes, final int 
numberOfCores, final String rackName) {
     synchronized (this.containers) {
+
       // get the first free nodeId in the rack
-      final Set<String> freeNodes = this.freeNodesPerRack.get(rackName)
-          .keySet();
-      final Iterator<String> it = freeNodes.iterator();
+      final Iterator<String> it = 
this.freeNodesPerRack.get(rackName).keySet().iterator();
+
       if (!it.hasNext()) {
-        throw new IllegalArgumentException(
-            "There should be a free node in the specified rack " + rackName);
+        throw new IllegalArgumentException("There should be a free node in the 
specified rack " + rackName);
       }
+
       final String nodeId = it.next();
-      // remove it from the free map
-      this.freeNodesPerRack.get(rackName).remove(nodeId);
+      it.remove();
+
       // allocate
       return allocate(megaBytes, numberOfCores, nodeId, rackName);
     }
   }
 
-  private Container allocate(final int megaBytes, final int numberOfCores,
-      final String nodeId, final String rackName) {
-    final String processID = nodeId + "-"
-        + String.valueOf(System.currentTimeMillis());
+  private Container allocate(
+      final int megaBytes, final int numberOfCores, final String nodeId, final 
String rackName) {
+
+    final String processID = nodeId + "-" + 
String.valueOf(System.currentTimeMillis());
+
     final File processFolder = new File(this.rootFolder, processID);
     if (!processFolder.exists() && !processFolder.mkdirs()) {
       LOG.log(Level.WARNING, "Failed to create [{0}]", 
processFolder.getAbsolutePath());
     }
+
     final ProcessContainer container = new ProcessContainer(
         this.errorHandlerRID, nodeId, processID, processFolder, megaBytes,
         numberOfCores, rackName, this.fileNames, this.processObserver);
+
     this.containers.put(container.getContainerID(), container);
     LOG.log(Level.FINE, "Allocated {0}", container.getContainerID());
+
     return container;
   }
 
@@ -403,13 +412,12 @@ final class ContainerManager implements AutoCloseable {
       if (this.containers.isEmpty()) {
         LOG.log(Level.FINEST, "Clean shutdown with no outstanding 
containers.");
       } else {
-        LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers.");
+        LOG.log(Level.WARNING, "Dirty shutdown with {0} outstanding 
containers.", this.containers.size());
         for (final Container c : this.containers.values()) {
-          LOG.log(Level.WARNING, "Force shutdown of: {0}", c);
+          LOG.log(Level.WARNING, "Force shutdown of container: {0}", c);
           c.close();
         }
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
 
b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
index 62646d2..71dad64 100644
--- 
a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
+++ 
b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
@@ -21,10 +21,6 @@ package org.apache.reef.runtime.multi.driver;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
 import org.apache.reef.runtime.common.driver.api.*;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
-import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
 import 
org.apache.reef.runtime.multi.client.parameters.SerializedRuntimeDefinition;
 import org.apache.reef.runtime.multi.driver.parameters.RuntimeName;
 import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer;
@@ -32,8 +28,8 @@ import 
org.apache.reef.runtime.multi.utils.avro.AvroMultiRuntimeDefinition;
 import org.apache.reef.runtime.multi.utils.avro.AvroRuntimeDefinition;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.AvroConfigurationSerializer;
@@ -53,30 +49,36 @@ import java.util.logging.Logger;
  * Hosts the actual runtime implementations and delegates invocations to them.
  */
 final class RuntimesHost {
+
   private static final Logger LOG = 
Logger.getLogger(RuntimesHost.class.getName());
+
   private final AvroMultiRuntimeDefinition runtimeDefinition;
   private final Injector originalInjector;
   private final String defaultRuntimeName;
-  private final MultiRuntimeDefinitionSerializer  runtimeDefinitionSerializer 
= new MultiRuntimeDefinitionSerializer();
+
   private Map<String, Runtime> runtimes;
 
   @Inject
-  private RuntimesHost(final Injector injector,
-                       @Parameter(SerializedRuntimeDefinition.class) final 
String serializedRuntimeDefinition) {
+  private RuntimesHost(
+      final Injector injector,
+      @Parameter(SerializedRuntimeDefinition.class) final String 
serializedRuntimeDefinition) {
+
     this.originalInjector = injector;
+
     try {
-      this.runtimeDefinition = 
this.runtimeDefinitionSerializer.fromString(serializedRuntimeDefinition);
-    } catch (IOException e) {
+      this.runtimeDefinition = new 
MultiRuntimeDefinitionSerializer().fromString(serializedRuntimeDefinition);
+    } catch (final IOException e) {
       throw new RuntimeException("Unable to read runtime configuration.", e);
     }
 
-    this.defaultRuntimeName = 
runtimeDefinition.getDefaultRuntimeName().toString();
+    this.defaultRuntimeName = 
this.runtimeDefinition.getDefaultRuntimeName().toString();
   }
 
   /**
    * Initializes the configured runtimes.
    */
   private synchronized void initialize() {
+
     if (this.runtimes != null) {
       return;
     }
@@ -90,58 +92,60 @@ final class RuntimesHost {
         // fork the original injector because of the same reason.
         // We create new injectors and copy form the original injector what we 
need.
         // rootInjector is an emptyInjector that we copy bindings from the 
original injector into. Then we fork
-        //it to instantiate the actual runtime.
-        Injector rootInjector = Tang.Factory.getTang().newInjector();
+        // it to instantiate the actual runtime.
+        final Injector rootInjector = Tang.Factory.getTang().newInjector();
         initializeInjector(rootInjector);
-        final JavaConfigurationBuilder cb = 
Tang.Factory.getTang().newConfigurationBuilder();
-        cb.bindNamedParameter(RuntimeName.class, 
rd.getRuntimeName().toString());
-        cb.bindImplementation(Runtime.class, RuntimeImpl.class);
 
-        AvroConfigurationSerializer serializer = new 
AvroConfigurationSerializer();
-        Configuration config = 
serializer.fromString(rd.getSerializedConfiguration().toString());
-        final Injector runtimeInjector = rootInjector.forkInjector(config, 
cb.build());
+        final Configuration runtimeConfig =
+            Tang.Factory.getTang().newConfigurationBuilder()
+                .bindNamedParameter(RuntimeName.class, 
rd.getRuntimeName().toString())
+                .bindImplementation(Runtime.class, RuntimeImpl.class)
+                .build();
+
+        final Configuration config =
+            new 
AvroConfigurationSerializer().fromString(rd.getSerializedConfiguration().toString());
+
+        final Injector runtimeInjector = rootInjector.forkInjector(config, 
runtimeConfig);
+
         this.runtimes.put(rd.getRuntimeName().toString(), 
runtimeInjector.getInstance(Runtime.class));
-      } catch (InjectionException e) {
-        throw new RuntimeException("Unable to initialize runtimes.", e);
-      } catch (IOException e) {
+
+      } catch (final IOException | InjectionException e) {
         throw new RuntimeException("Unable to initialize runtimes.", e);
       }
     }
   }
 
   /**
+   * Copy event handler from current class configuration into runtime injector.
+   * This is a helper method called from initializeInjector() only.
+   * @param runtimeInjector Runtime injector to copy event handler to.
+   * @param param Class that identifies the event handler parameter.
+   * @param <T> Type of the event handler.
+   * @throws InjectionException If configuration error occurs.
+   */
+  private <T extends EventHandler<?>> void copyEventHandler(
+      final Injector runtimeInjector, final Class<? extends Name<T>> param) 
throws InjectionException {
+    runtimeInjector.bindVolatileParameter(param, 
this.originalInjector.getNamedInstance(param));
+  }
+
+  /**
    * Initializes injector by copying needed handlers.
    * @param runtimeInjector The injector to initialize
-   * @throws InjectionException
+   * @throws InjectionException on configuration error.
    */
   private void initializeInjector(final Injector runtimeInjector) throws 
InjectionException {
-    final EventHandler<ResourceStatusEvent> statusEventHandler =
-            
this.originalInjector.getNamedInstance(RuntimeParameters.ResourceStatusHandler.class);
-    
runtimeInjector.bindVolatileParameter(RuntimeParameters.ResourceStatusHandler.class,
 statusEventHandler);
-    final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler =
-            
this.originalInjector.getNamedInstance(RuntimeParameters.NodeDescriptorHandler.class);
-    
runtimeInjector.bindVolatileParameter(RuntimeParameters.NodeDescriptorHandler.class,
 nodeDescriptorEventHandler);
-    final EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler 
=
-            
this.originalInjector.getNamedInstance(RuntimeParameters.ResourceAllocationHandler.class);
-    runtimeInjector.bindVolatileParameter(
-            RuntimeParameters.ResourceAllocationHandler.class,
-            resourceAllocationEventHandler);
-    final EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler =
-            
this.originalInjector.getNamedInstance(RuntimeParameters.RuntimeStatusHandler.class);
-    runtimeInjector.bindVolatileParameter(
-            RuntimeParameters.RuntimeStatusHandler.class,
-            runtimeStatusEventHandler);
-    HttpServer httpServer = null;
+
+    copyEventHandler(runtimeInjector, 
RuntimeParameters.ResourceStatusHandler.class);
+    copyEventHandler(runtimeInjector, 
RuntimeParameters.NodeDescriptorHandler.class);
+    copyEventHandler(runtimeInjector, 
RuntimeParameters.ResourceAllocationHandler.class);
+    copyEventHandler(runtimeInjector, 
RuntimeParameters.RuntimeStatusHandler.class);
+
     try {
-      httpServer = this.originalInjector.getInstance(HttpServer.class);
+      runtimeInjector.bindVolatileInstance(HttpServer.class, 
this.originalInjector.getInstance(HttpServer.class));
+      LOG.log(Level.INFO, "Binding http server for the runtime 
implementation");
     } catch (final InjectionException e) {
       LOG.log(Level.INFO, "Http Server is not configured for the runtime", e);
     }
-
-    if (httpServer != null) {
-      runtimeInjector.bindVolatileInstance(HttpServer.class, httpServer);
-      LOG.log(Level.INFO, "Binding http server for the runtime 
implementation");
-    }
   }
 
   /**
@@ -150,39 +154,38 @@ final class RuntimesHost {
    * @return The runtime
    */
   private Runtime getRuntime(final String requestedRuntimeName) {
-    String runtimeName = requestedRuntimeName;
-    if (StringUtils.isBlank(runtimeName)) {
-      runtimeName = this.defaultRuntimeName;
-    }
 
-    Runtime runtime = this.runtimes.get(runtimeName);
+    final String runtimeName =
+        StringUtils.isBlank(requestedRuntimeName) ? this.defaultRuntimeName : 
requestedRuntimeName;
 
+    final Runtime runtime = this.runtimes.get(runtimeName);
     Validate.notNull(runtime, "Couldn't find runtime for name " + runtimeName);
+
     return runtime;
   }
 
-  void onResourceLaunch(final ResourceLaunchEvent value) {
-    getRuntime(value.getRuntimeName()).onResourceLaunch(value);
+  void onResourceLaunch(final ResourceLaunchEvent event) {
+    getRuntime(event.getRuntimeName()).onResourceLaunch(event);
   }
 
-  void onRuntimeStart(final RuntimeStart value) {
+  void onRuntimeStart(final RuntimeStart event) {
     initialize();
-    for (Runtime runtime : this.runtimes.values()) {
-      runtime.onRuntimeStart(value);
+    for (final Runtime runtime : this.runtimes.values()) {
+      runtime.onRuntimeStart(event);
     }
   }
 
-  void onRuntimeStop(final RuntimeStop value) {
-    for (Runtime runtime : this.runtimes.values()) {
-      runtime.onRuntimeStop(value);
+  void onRuntimeStop(final RuntimeStop event) {
+    for (final Runtime runtime : this.runtimes.values()) {
+      runtime.onRuntimeStop(event);
     }
   }
 
-  void onResourceRelease(final ResourceReleaseEvent value) {
-    getRuntime(value.getRuntimeName()).onResourceRelease(value);
+  void onResourceRelease(final ResourceReleaseEvent event) {
+    getRuntime(event.getRuntimeName()).onResourceRelease(event);
   }
 
-  void onResourceRequest(final ResourceRequestEvent value) {
-    getRuntime(value.getRuntimeName()).onResourceRequest(value);
+  void onResourceRequest(final ResourceRequestEvent event) {
+    getRuntime(event.getRuntimeName()).onResourceRequest(event);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index f38b53a..58fcca1 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -45,7 +45,6 @@ import 
org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.tang.InjectionFuture;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
-import org.apache.reef.wake.remote.Encoder;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 
 import javax.inject.Inject;
@@ -65,15 +64,12 @@ final class YarnContainerManager
   private static final String RUNTIME_NAME = "YARN";
 
   private final YarnClient yarnClient = YarnClient.createYarnClient();
-
   private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM = 
new ConcurrentLinkedQueue<>();
-
   private final Queue<AMRMClient.ContainerRequest> requestsAfterSentToRM = new 
ConcurrentLinkedQueue<>();
-
   private final Map<String, String> nodeIdToRackName = new 
ConcurrentHashMap<>();
 
   private final YarnConfiguration yarnConf;
-  private final AMRMClientAsync resourceManager;
+  private final AMRMClientAsync<AMRMClient.ContainerRequest> resourceManager;
   private final NMClientAsync nodeManager;
   private final REEFEventHandlers reefEventHandlers;
   private final Containers containers;
@@ -87,19 +83,20 @@ final class YarnContainerManager
   private final InjectionFuture<ProgressProvider> progressProvider;
 
   @Inject
-  YarnContainerManager(
-      final YarnConfiguration yarnConf,
+  private YarnContainerManager(
       @Parameter(YarnHeartbeatPeriod.class) final int yarnRMHeartbeatPeriod,
+      @Parameter(JobSubmissionDirectory.class) final String 
jobSubmissionDirectory,
+      final YarnConfiguration yarnConf,
       final REEFEventHandlers reefEventHandlers,
       final Containers containers,
       final ApplicationMasterRegistration registration,
       final ContainerRequestCounter containerRequestCounter,
       final DriverStatusManager driverStatusManager,
       final REEFFileNames reefFileNames,
-      @Parameter(JobSubmissionDirectory.class) final String 
jobSubmissionDirectory,
       final TrackingURLProvider trackingURLProvider,
       final RackNameFormatter rackNameFormatter,
       final InjectionFuture<ProgressProvider> progressProvider) throws 
IOException {
+
     this.reefEventHandlers = reefEventHandlers;
     this.driverStatusManager = driverStatusManager;
 
@@ -110,43 +107,56 @@ final class YarnContainerManager
     this.trackingURLProvider = trackingURLProvider;
     this.rackNameFormatter = rackNameFormatter;
 
-
     this.yarnClient.init(this.yarnConf);
 
     this.resourceManager = 
AMRMClientAsync.createAMRMClientAsync(yarnRMHeartbeatPeriod, this);
     this.nodeManager = new NMClientAsyncImpl(this);
+
     this.jobSubmissionDirectory = jobSubmissionDirectory;
     this.reefFileNames = reefFileNames;
     this.progressProvider = progressProvider;
+
     LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
   }
 
-
+  /**
+   * RM Callback: RM reports some completed containers. Update status of each 
container in the list.
+   * @param completedContainers list of completed containers.
+   */
   @Override
-  public void onContainersCompleted(final List<ContainerStatus> 
containerStatuses) {
-    for (final ContainerStatus containerStatus : containerStatuses) {
-      onContainerStatus(containerStatus);
+  public void onContainersCompleted(final List<ContainerStatus> 
completedContainers) {
+    for (final ContainerStatus containerStatus : completedContainers) {
+      this.onContainerStatus(containerStatus);
     }
   }
 
+  /**
+   * RM Callback: RM reports that some containers have been allocated.
+   * @param allocatedContainers list of containers newly allocated by RM.
+   */
   @Override
-  @SuppressWarnings("checkstyle:hiddenfield")
-  public void onContainersAllocated(final List<Container> containers) {
+  public void onContainersAllocated(final List<Container> allocatedContainers) 
{
+
+    String id = null; // ID is used for logging only
 
-    // ID is used for logging only
-    final String id = String.format("%s:%d",
-        Thread.currentThread().getName().replace(' ', '_'), 
System.currentTimeMillis());
+    if (LOG.isLoggable(Level.FINE)) {
 
-    LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}",
-        new Object[]{id, containers.size(), 
this.containerRequestCounter.get()});
+      id = String.format("%s:%d", Thread.currentThread().getName().replace(' 
', '_'), System.currentTimeMillis());
 
-    for (final Container container : containers) {
-      handleNewContainer(container);
+      LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}",
+          new Object[] {id, allocatedContainers.size(), 
this.containerRequestCounter.get()});
+    }
+
+    for (final Container container : allocatedContainers) {
+      this.handleNewContainer(container);
     }
 
     LOG.log(Level.FINE, "TIME: Processed Containers {0}", id);
   }
 
+  /**
+   * RM Callback: RM requests application shutdown.
+   */
   @Override
   public void onShutdownRequest() {
     this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
@@ -154,14 +164,23 @@ final class YarnContainerManager
     this.driverStatusManager.onError(new Exception("Shutdown requested by 
YARN."));
   }
 
+  /**
+   * RM Callback: RM reports status change of some nodes.
+   * @param nodeReports list of nodes with changed status.
+   */
   @Override
   public void onNodesUpdated(final List<NodeReport> nodeReports) {
     for (final NodeReport nodeReport : nodeReports) {
       this.nodeIdToRackName.put(nodeReport.getNodeId().toString(), 
nodeReport.getRackName());
-      onNodeReport(nodeReport);
+      this.onNodeReport(nodeReport);
     }
   }
 
+  /**
+   * RM Callback: Report application progress to RM.
+   * Progress is a floating point number between 0 and 1.
+   * @return a floating point number between 0 and 1.
+   */
   @Override
   public float getProgress() {
     try {
@@ -174,52 +193,81 @@ final class YarnContainerManager
     }
   }
 
+  /**
+   * RM Callback: RM reports an error.
+   * @param throwable An exception thrown from RM.
+   */
   @Override
   public void onError(final Throwable throwable) {
-    onRuntimeError(throwable);
+    this.onRuntimeError(throwable);
   }
 
+  /**
+   * NM Callback: NM accepts the starting container request.
+   * @param containerId ID of a new container being started.
+   * @param stringByteBufferMap a Map between the auxiliary service names and 
their outputs. Not used.
+   */
   @Override
-  public void onContainerStarted(
-      final ContainerId containerId, final Map<String, ByteBuffer> 
stringByteBufferMap) {
+  public void onContainerStarted(final ContainerId containerId, final 
Map<String, ByteBuffer> stringByteBufferMap) {
     final Optional<Container> container = 
this.containers.getOptional(containerId.toString());
     if (container.isPresent()) {
       this.nodeManager.getContainerStatusAsync(containerId, 
container.get().getNodeId());
     }
   }
 
+  /**
+   * NM Callback: NM reports container status.
+   * @param containerId ID of a container with the status being reported.
+   * @param containerStatus YARN container status.
+   */
   @Override
-  public void onContainerStatusReceived(
-      final ContainerId containerId, final ContainerStatus containerStatus) {
+  public void onContainerStatusReceived(final ContainerId containerId, final 
ContainerStatus containerStatus) {
     onContainerStatus(containerStatus);
   }
 
+  /**
+   * NM Callback: NM reports stop of a container.
+   * @param containerId ID of a container stopped.
+   */
   @Override
   public void onContainerStopped(final ContainerId containerId) {
     final boolean hasContainer = 
this.containers.hasContainer(containerId.toString());
     if (hasContainer) {
-      final ResourceStatusEventImpl.Builder resourceStatusBuilder =
-          
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
-      resourceStatusBuilder.setState(State.DONE);
-      this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+      this.reefEventHandlers.onResourceStatus(
+          ResourceStatusEventImpl.newBuilder()
+             .setIdentifier(containerId.toString())
+             .setState(State.DONE)
+             .build());
     }
   }
 
+  /**
+   * NM Callback: NM reports failure on container start.
+   * @param containerId ID of a container that has failed to start.
+   * @param throwable An error that caused container to fail.
+   */
   @Override
-  public void onStartContainerError(
-      final ContainerId containerId, final Throwable throwable) {
-    handleContainerError(containerId, throwable);
+  public void onStartContainerError(final ContainerId containerId, final 
Throwable throwable) {
+    this.handleContainerError(containerId, throwable);
   }
 
+  /**
+   * NM Callback: NM can not obtain status of the container.
+   * @param containerId ID of a container that failed to report its status.
+   * @param throwable An error that occured when querying status of a 
container.
+   */
   @Override
-  public void onGetContainerStatusError(
-      final ContainerId containerId, final Throwable throwable) {
-    handleContainerError(containerId, throwable);
+  public void onGetContainerStatusError(final ContainerId containerId, final 
Throwable throwable) {
+    this.handleContainerError(containerId, throwable);
   }
 
+  /**
+   * NM Callback: NM fails to stop the container.
+   * @param containerId ID of the container that failed to stop.
+   * @param throwable An error that occurred when trying to stop the container.
+   */
   @Override
-  public void onStopContainerError(
-      final ContainerId containerId, final Throwable throwable) {
+  public void onStopContainerError(final ContainerId containerId, final 
Throwable throwable) {
     handleContainerError(containerId, throwable);
   }
 
@@ -250,11 +298,17 @@ final class YarnContainerManager
     updateRuntimeStatus();
   }
 
+  /**
+   * Start the YARN container manager.
+   * This method is called from DriverRuntimeStartHandler via 
YARNRuntimeStartHandler.
+   */
   void onStart() {
 
     this.yarnClient.start();
+
     this.resourceManager.init(this.yarnConf);
     this.resourceManager.start();
+
     this.nodeManager.init(this.yarnConf);
     this.nodeManager.start();
 
@@ -268,33 +322,45 @@ final class YarnContainerManager
     }
 
     try {
-      
this.registration.setRegistration(this.resourceManager.registerApplicationMaster(
-          "", 0, this.trackingURLProvider.getTrackingUrl()));
-      LOG.log(Level.FINE, "YARN registration: {0}", registration);
+
+      this.registration.setRegistration(
+          this.resourceManager.registerApplicationMaster("", 0, 
this.trackingURLProvider.getTrackingUrl()));
+
+      LOG.log(Level.FINE, "YARN registration: {0}", this.registration);
+
       final FileSystem fs = FileSystem.get(this.yarnConf);
       final Path outputFileName = new Path(this.jobSubmissionDirectory, 
this.reefFileNames.getDriverHttpEndpoint());
-      final FSDataOutputStream out = fs.create(outputFileName);
-      out.writeBytes(this.trackingURLProvider.getTrackingUrl() + "\n");
-      out.flush();
-      out.close();
+
+      try (final FSDataOutputStream out = fs.create(outputFileName)) {
+        out.writeBytes(this.trackingURLProvider.getTrackingUrl() + '\n');
+      }
+
     } catch (final YarnException | IOException e) {
       LOG.log(Level.WARNING, "Unable to register application master.", e);
       onRuntimeError(e);
     }
   }
 
+  /**
+   * Shut down YARN container manager.
+   * This method is called from DriverRuntimeStopHandler via 
YARNRuntimeStopHandler.
+   * @param exception Exception that caused driver to stop. Can be null if 
there was no error.
+   */
   void onStop(final Throwable exception) {
 
     LOG.log(Level.FINE, "Stop Runtime: RM status {0}", 
this.resourceManager.getServiceState());
 
     if (this.resourceManager.getServiceState() == Service.STATE.STARTED) {
+
       // invariant: if RM is still running then we declare success.
       try {
+
         this.reefEventHandlers.close();
+
         if (exception == null) {
-          this.resourceManager.unregisterApplicationMaster(
-              FinalApplicationStatus.SUCCEEDED, null, null);
+          
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
 null, null);
         } else {
+
           // Note: We don't allow RM to restart our applications if it's an 
application level failure.
           // If applications are to be long-running, they should catch 
Exceptions before the REEF level
           // instead of relying on the RM restart mechanism.
@@ -302,8 +368,8 @@ final class YarnContainerManager
           // to leak to this stage.
           final String failureMsg = String.format("Application failed due 
to:%n%s%n" +
               "With stack trace:%n%s", exception.getMessage(), 
ExceptionUtils.getStackTrace(exception));
-          this.resourceManager.unregisterApplicationMaster(
-              FinalApplicationStatus.FAILED, failureMsg, null);
+
+          
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED, 
failureMsg, null);
         }
 
         this.resourceManager.close();
@@ -325,7 +391,9 @@ final class YarnContainerManager
   // HELPER METHODS
 
   private void onNodeReport(final NodeReport nodeReport) {
+
     LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
+
     
this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder()
         .setIdentifier(nodeReport.getNodeId().toString())
         .setHostName(nodeReport.getNodeId().getHost())
@@ -337,19 +405,17 @@ final class YarnContainerManager
 
   private void handleContainerError(final ContainerId containerId, final 
Throwable throwable) {
 
-    final ResourceStatusEventImpl.Builder resourceStatusBuilder =
-        
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
-
-    resourceStatusBuilder.setState(State.FAILED);
-    resourceStatusBuilder.setExitCode(1);
-    resourceStatusBuilder.setDiagnostics(throwable.getMessage());
-    this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+    
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+        .setIdentifier(containerId.toString())
+        .setState(State.FAILED)
+        .setExitCode(1)
+        .setDiagnostics(throwable.getMessage())
+        .build());
   }
 
   /**
    * Handles container status reports. Calls come from YARN.
-   *
-   * @param value containing the container status
+   * @param value containing the container status.
    */
   private void onContainerStatus(final ContainerStatus value) {
 
@@ -387,7 +453,7 @@ final class YarnContainerManager
         status.setDiagnostics(value.getDiagnostics());
       }
 
-      // The ResourceStatusHandler should close and release the Evaluator for 
us if the state is a terminal state.
+      // ResourceStatusHandler should close and release the Evaluator for us 
if the state is a terminal state.
       this.reefEventHandlers.onResourceStatus(status.build());
     }
   }
@@ -397,7 +463,7 @@ final class YarnContainerManager
     synchronized (this) {
       this.containerRequestCounter.incrementBy(containerRequests.length);
       this.requestsBeforeSentToRM.addAll(Arrays.asList(containerRequests));
-      doHomogeneousRequests();
+      this.doHomogeneousRequests();
     }
 
     this.updateRuntimeStatus();
@@ -405,55 +471,59 @@ final class YarnContainerManager
 
   /**
    * Handles new container allocations. Calls come from YARN.
-   *
-   * @param container newly allocated
+   * @param container newly allocated YARN container.
    */
   private void handleNewContainer(final Container container) {
 
     LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
+
     synchronized (this) {
-      if (matchContainerWithPendingRequest(container)) {
-        final AMRMClient.ContainerRequest matchedRequest = 
this.requestsAfterSentToRM.peek();
-        this.containerRequestCounter.decrement();
-        this.containers.add(container);
-
-        LOG.log(Level.FINEST, "{0} matched with {1}", new 
Object[]{container.toString(), matchedRequest.toString()});
-
-        // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when 
x-priority m-capacity zero-container
-        // request and x-priority n-capacity nonzero-container request are 
sent together, where m > n, RM ignores
-        // the latter.
-        // Therefore it is necessary avoid sending zero-container request, 
even it means getting extra containers.
-        // It is okay to send nonzero m-capacity and n-capacity request 
together since bigger containers
-        // can be matched.
-        // TODO[JIRA REEF-42, REEF-942]: revisit this when implementing 
locality-strictness
-        // (i.e. a specific rack request can be ignored)
-        if (this.requestsAfterSentToRM.size() > 1) {
-          try {
-            this.resourceManager.removeContainerRequest(matchedRequest);
-          } catch (final Exception e) {
-            LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's 
queue, " +
-                "removal attempt failed with exception", e);
-          }
-        }
 
-        this.requestsAfterSentToRM.remove();
-        doHomogeneousRequests();
-
-        LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number 
= {1}",
-            new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
-        
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
-            .setIdentifier(container.getId().toString())
-            .setNodeId(container.getNodeId().toString())
-            .setResourceMemory(container.getResource().getMemory())
-            .setVirtualCores(container.getResource().getVirtualCores())
-            .setRackName(rackNameFormatter.getRackName(container))
-            .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
-            .build());
-        this.updateRuntimeStatus();
-      } else {
+      if (!matchContainerWithPendingRequest(container)) {
         LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match, 
releasing...", container.getId());
         this.resourceManager.releaseAssignedContainer(container.getId());
+        return;
       }
+
+      final AMRMClient.ContainerRequest matchedRequest = 
this.requestsAfterSentToRM.peek();
+
+      this.containerRequestCounter.decrement();
+      this.containers.add(container);
+
+      LOG.log(Level.FINEST, "{0} matched with {1}", new Object[] {container, 
matchedRequest});
+
+      // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when 
x-priority m-capacity zero-container
+      // request and x-priority n-capacity nonzero-container request are sent 
together, where m > n, RM ignores
+      // the latter.
+      // Therefore it is necessary avoid sending zero-container request, even 
if it means getting extra containers.
+      // It is okay to send nonzero m-capacity and n-capacity request together 
since bigger containers
+      // can be matched.
+      // TODO[JIRA REEF-42, REEF-942]: revisit this when implementing 
locality-strictness.
+      // (i.e. a specific rack request can be ignored)
+      if (this.requestsAfterSentToRM.size() > 1) {
+        try {
+          this.resourceManager.removeContainerRequest(matchedRequest);
+        } catch (final Exception e) {
+          LOG.log(Level.WARNING, "Error removing request from Async AMRM 
client queue: " + matchedRequest, e);
+        }
+      }
+
+      this.requestsAfterSentToRM.remove();
+      this.doHomogeneousRequests();
+
+      LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = 
{1}",
+          new Object[] {container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
+
+      
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
+          .setIdentifier(container.getId().toString())
+          .setNodeId(container.getNodeId().toString())
+          .setResourceMemory(container.getResource().getMemory())
+          .setVirtualCores(container.getResource().getVirtualCores())
+          .setRackName(rackNameFormatter.getRackName(container))
+          .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
+          .build());
+
+      this.updateRuntimeStatus();
     }
   }
 
@@ -484,15 +554,19 @@ final class YarnContainerManager
    * up the allocation and in placing containers on other machines.
    */
   private boolean matchContainerWithPendingRequest(final Container container) {
+
     if (this.requestsAfterSentToRM.isEmpty()) {
       return false;
     }
 
     final AMRMClient.ContainerRequest request = 
this.requestsAfterSentToRM.peek();
+
     final boolean resourceCondition = container.getResource().getMemory() >= 
request.getCapability().getMemory();
+
     // TODO[JIRA REEF-35]: check vcores once YARN-2380 is resolved
     final boolean nodeCondition = request.getNodes() == null
         || request.getNodes().contains(container.getNodeId().getHost());
+
     final boolean rackCondition = request.getRacks() == null
         || 
request.getRacks().contains(this.nodeIdToRackName.get(container.getNodeId().toString()));
 
@@ -504,11 +578,10 @@ final class YarnContainerManager
    */
   private void updateRuntimeStatus() {
 
-    final RuntimeStatusEventImpl.Builder builder =
-        RuntimeStatusEventImpl.newBuilder()
-            .setName(RUNTIME_NAME)
-            .setState(State.RUNNING)
-            
.setOutstandingContainerRequests(this.containerRequestCounter.get());
+    final RuntimeStatusEventImpl.Builder builder = 
RuntimeStatusEventImpl.newBuilder()
+        .setName(RUNTIME_NAME)
+        .setState(State.RUNNING)
+        .setOutstandingContainerRequests(this.containerRequestCounter.get());
 
     for (final String allocatedContainerId : 
this.containers.getContainerIds()) {
       builder.addContainerAllocation(allocatedContainerId);
@@ -522,26 +595,25 @@ final class YarnContainerManager
     // SHUTDOWN YARN
     try {
       this.reefEventHandlers.close();
-      this.resourceManager.unregisterApplicationMaster(
-          FinalApplicationStatus.FAILED, throwable.getMessage(), null);
+      
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED, 
throwable.getMessage(), null);
     } catch (final Exception e) {
       LOG.log(Level.WARNING, "Error shutting down YARN application", e);
     } finally {
       this.resourceManager.stop();
     }
 
-    final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = 
RuntimeStatusEventImpl.newBuilder()
-        .setState(State.FAILED)
-        .setName(RUNTIME_NAME);
-
-    final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
-    
runtimeStatusBuilder.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder()
-        .setName(RUNTIME_NAME)
-        .setMessage(throwable.getMessage())
-        .setException(ByteString.copyFrom(codec.encode(throwable)))
-        .build())
-        .build();
+    final ReefServiceProtos.RuntimeErrorProto runtimeError =
+        ReefServiceProtos.RuntimeErrorProto.newBuilder()
+            .setName(RUNTIME_NAME)
+            .setMessage(throwable.getMessage())
+            .setException(ByteString.copyFrom(new 
ObjectSerializableCodec<>().encode(throwable)))
+            .build();
 
-    this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
+    this.reefEventHandlers.onRuntimeStatus(
+        RuntimeStatusEventImpl.newBuilder()
+            .setState(State.FAILED)
+            .setName(RUNTIME_NAME)
+            .setError(runtimeError)
+            .build());
   }
 }

Reply via email to