Repository: reef
Updated Branches:
  refs/heads/master a9d6d6a66 -> 719e64459


[REEF-1557] Refactor State, EvaluatorState, and related clases.

  * Move all state-checking logic into corresponding enums.
  * Make state atomic in EvaluatorStatusManager.
  * Add javadocs and cleanup the code.
  * Refactor EvaluatorManager for readability.

JIRA:
  [REEF-1557](https://issues.apache.org/jira/browse/REEF-1557)

Pull request:
  This closes #1118


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/719e6445
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/719e6445
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/719e6445

Branch: refs/heads/master
Commit: 719e6445951b0214c1b687ac85def666e5bc2c4a
Parents: a9d6d6a
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Fri Sep 9 11:43:02 2016 -0700
Committer: Mariia Mykhailova <mar...@apache.org>
Committed: Fri Sep 16 15:31:13 2016 -0700

----------------------------------------------------------------------
 .../driver/restart/EvaluatorRestartInfo.java    |  23 +-
 .../driver/restart/EvaluatorRestartState.java   |  37 ++-
 .../runtime/common/driver/DriverStatus.java     |  54 ++++-
 .../common/driver/DriverStatusManager.java      |  77 +++---
 .../driver/evaluator/EvaluatorManager.java      | 240 +++++++++++--------
 .../common/driver/evaluator/EvaluatorState.java | 156 +++++++++++-
 .../evaluator/EvaluatorStatusManager.java       | 218 ++++++++---------
 .../evaluator/pojos/EvaluatorStatusPOJO.java    |  48 +---
 .../common/driver/evaluator/pojos/State.java    | 121 +++++++++-
 .../driver/evaluator/pojos/TaskStatusPOJO.java  |  70 ++----
 .../common/driver/idle/DriverIdleManager.java   |   2 +-
 .../resourcemanager/ResourceManagerStatus.java  |  72 +-----
 12 files changed, 676 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
index 8f35af5..27baa61 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -33,7 +33,9 @@ import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEven
 @DriverSide
 @Unstable
 public final class EvaluatorRestartInfo {
+
   private final ResourceRecoverEvent resourceRecoverEvent;
+
   private EvaluatorRestartState evaluatorRestartState;
 
   /**
@@ -44,11 +46,19 @@ public final class EvaluatorRestartInfo {
     return new EvaluatorRestartInfo(resourceRecoverEvent, 
EvaluatorRestartState.EXPECTED);
   }
 
+  private EvaluatorRestartInfo(
+      final ResourceRecoverEvent resourceRecoverEvent, final 
EvaluatorRestartState evaluatorRestartState) {
+
+    this.resourceRecoverEvent = resourceRecoverEvent;
+    this.evaluatorRestartState = evaluatorRestartState;
+  }
+
   /**
    * Creates an {@link EvaluatorRestartInfo} object that represents the 
information of an evaluator that
    * has failed on driver restart.
    */
   public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String 
evaluatorId) {
+
     final ResourceRecoverEvent resourceRecoverEvent =
         
ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build();
 
@@ -61,31 +71,26 @@ public final class EvaluatorRestartInfo {
    * recovered evaluator on restart.
    */
   public ResourceRecoverEvent getResourceRecoverEvent() {
-    return resourceRecoverEvent;
+    return this.resourceRecoverEvent;
   }
 
   /**
    * @return the current process of the restart.
    */
   public EvaluatorRestartState getEvaluatorRestartState() {
-    return evaluatorRestartState;
+    return this.evaluatorRestartState;
   }
 
   /**
    * sets the current process of the restart.
    */
   public boolean setEvaluatorRestartState(final EvaluatorRestartState to) {
-    if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) {
+
+    if (this.evaluatorRestartState.isLegalTransition(to)) {
       this.evaluatorRestartState = to;
       return true;
     }
 
     return false;
   }
-
-  private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent,
-                               final EvaluatorRestartState 
evaluatorRestartState) {
-    this.resourceRecoverEvent = resourceRecoverEvent;
-    this.evaluatorRestartState = evaluatorRestartState;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index a9b2d94..c48c494 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -29,6 +29,7 @@ import org.apache.reef.annotations.audience.Private;
 @DriverSide
 @Unstable
 public enum EvaluatorRestartState {
+
   /**
    * The evaluator is not a restarted instance. Not expecting.
    */
@@ -65,32 +66,51 @@ public enum EvaluatorRestartState {
   FAILED;
 
   /**
+   * Check if the transition of {@link EvaluatorRestartState} from one state 
to another is legal.
+   * @param fromState start state.
+   * @param toState destination state.
    * @return true if the transition of {@link EvaluatorRestartState} is legal.
+   * @deprecated TODO[JIRA REEF-1560] Use non-static method instead. Remove 
after version 0.16
    */
-  public static boolean isLegalTransition(final EvaluatorRestartState from, 
final EvaluatorRestartState to) {
-    switch(from) {
+  @Deprecated
+  public static boolean isLegalTransition(
+      final EvaluatorRestartState fromState, final EvaluatorRestartState 
toState) {
+    return fromState.isLegalTransition(toState);
+  }
+
+  /**
+   * Check if the transition of {@link EvaluatorRestartState} from current 
state to the given one is legal.
+   * @param toState destination state.
+   * @return true if the transition is legal, false otherwise.
+   */
+  public final boolean isLegalTransition(final EvaluatorRestartState toState) {
+
+    switch(this) {
     case EXPECTED:
-      switch(to) {
+      switch(toState) {
       case EXPIRED:
       case REPORTED:
         return true;
       default:
         return false;
       }
+
     case REPORTED:
-      switch(to) {
+      switch(toState) {
       case REREGISTERED:
         return true;
       default:
         return false;
       }
+
     case REREGISTERED:
-      switch(to) {
+      switch(toState) {
       case PROCESSED:
         return true;
       default:
         return false;
       }
+
     default:
       return false;
     }
@@ -135,4 +155,11 @@ public enum EvaluatorRestartState {
       return false;
     }
   }
+
+  /**
+   * @return true if the evaluator has had its recovery heartbeat processed.
+   */
+  public boolean isReregistered() {
+    return this == REREGISTERED;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
index 1999329..922137f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
@@ -22,9 +22,61 @@ package org.apache.reef.runtime.common.driver;
  * The status of the Driver.
  */
 public enum DriverStatus {
+
   PRE_INIT,
   INIT,
   RUNNING,
   SHUTTING_DOWN,
-  FAILING
+  FAILING;
+
+  /**
+   * Check if the driver is in process of shutting down (either gracefully or 
due to an error).
+   * @return true if the driver is shutting down (gracefully or otherwise).
+   */
+  public boolean isClosing() {
+    return this == SHUTTING_DOWN || this == FAILING;
+  }
+
+  /**
+   * Check whether a driver state transition from current state to a given one 
is legal.
+   * @param toStatus Destination state.
+   * @return true if transition is valid, false otherwise.
+   */
+  public boolean isLegalTransition(final DriverStatus toStatus) {
+
+    switch (this) {
+
+    case PRE_INIT:
+      switch (toStatus) {
+      case INIT:
+        return true;
+      default:
+        return false;
+      }
+
+    case INIT:
+      switch (toStatus) {
+      case RUNNING:
+        return true;
+      default:
+        return false;
+      }
+
+    case RUNNING:
+      switch (toStatus) {
+      case SHUTTING_DOWN:
+      case FAILING:
+        return true;
+      default:
+        return false;
+      }
+
+    case FAILING:
+    case SHUTTING_DOWN:
+      return false;
+
+    default:
+      throw new IllegalStateException("Unknown input state: " + this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 129e702..6084431 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -76,44 +76,6 @@ public final class DriverStatusManager {
   }
 
   /**
-   * Check whether a state transition 'from->to' is legal.
-   * @param from Source state.
-   * @param to Destination state.
-   * @return true if transition is valid, false otherwise.
-   */
-  private static boolean isLegalTransition(final DriverStatus from, final 
DriverStatus to) {
-    switch (from) {
-    case PRE_INIT:
-      switch (to) {
-      case INIT:
-        return true;
-      default:
-        return false;
-      }
-    case INIT:
-      switch (to) {
-      case RUNNING:
-        return true;
-      default:
-        return false;
-      }
-    case RUNNING:
-      switch (to) {
-      case SHUTTING_DOWN:
-      case FAILING:
-        return true;
-      default:
-        return false;
-      }
-    case FAILING:
-    case SHUTTING_DOWN:
-      return false;
-    default:
-      throw new IllegalStateException("Unknown input state: " + from);
-    }
-  }
-
-  /**
    * Changes the driver status to INIT and sends message to the client about 
the transition.
    */
   public synchronized void onInit() {
@@ -134,7 +96,7 @@ public final class DriverStatusManager {
 
     LOG.entering(CLASS_NAME, "onRunning");
 
-    if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
+    if (this.driverStatus == DriverStatus.PRE_INIT) {
       this.onInit();
     }
 
@@ -152,7 +114,7 @@ public final class DriverStatusManager {
 
     LOG.entering(CLASS_NAME, "onError", exception);
 
-    if (this.isShuttingDownOrFailing()) {
+    if (this.isClosing()) {
       LOG.log(Level.WARNING, "Received an exception while already in 
shutdown.", exception);
     } else {
       LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", 
exception);
@@ -171,15 +133,18 @@ public final class DriverStatusManager {
 
     LOG.entering(CLASS_NAME, "onComplete");
 
-    if (this.isShuttingDownOrFailing()) {
+    if (this.isClosing()) {
       LOG.log(Level.WARNING, "Ignoring second call to onComplete()",
           new Exception("Dummy exception to get the call stack"));
     } else {
+
       LOG.log(Level.INFO, "Clean shutdown of the Driver.");
+
       if (LOG.isLoggable(Level.FINEST)) {
         LOG.log(Level.FINEST, "Call stack: ",
             new Exception("Dummy exception to get the call stack"));
       }
+
       this.clock.close();
       this.setStatus(DriverStatus.SHUTTING_DOWN);
     }
@@ -201,9 +166,10 @@ public final class DriverStatusManager {
    * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a 
proxy to the job client.
    * After release 0.16, make this method private and use it inside 
onRuntimeStop() method instead.
    */
+  @Deprecated
   public synchronized void sendJobEndingMessageToClient(final 
Optional<Throwable> exception) {
 
-    if (!this.isShuttingDownOrFailing()) {
+    if (!this.isClosing()) {
       LOG.log(Level.SEVERE, "Sending message in a state different that 
SHUTTING_DOWN or FAILING. " +
           "This is likely a illegal call to clock.close() at play. Current 
state: {0}", this.driverStatus);
     }
@@ -234,21 +200,34 @@ public final class DriverStatusManager {
     this.driverTerminationHasBeenCommunicatedToClient = true;
   }
 
+  /**
+   * Check if the driver is in process of shutting down (either gracefully or 
due to an error).
+   * @return true if the driver is shutting down (gracefully or otherwise).
+   * @deprecated TODO[JIRA REEF-1560] Use isClosing() method instead. Remove 
after version 0.16
+   */
+  @Deprecated
   public synchronized boolean isShuttingDownOrFailing() {
-    return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
-        || DriverStatus.FAILING.equals(this.driverStatus);
+    return this.isClosing();
+  }
+
+  /**
+   * Check if the driver is in process of shutting down (either gracefully or 
due to an error).
+   * @return true if the driver is shutting down (gracefully or otherwise).
+   */
+  public synchronized boolean isClosing() {
+    return this.driverStatus.isClosing();
   }
 
   /**
    * Helper method to set the status.
    * This also checks whether the transition from the current status to the 
new one is legal.
-   * @param newStatus Driver status to transition to.
+   * @param toStatus Driver status to transition to.
    */
-  private synchronized void setStatus(final DriverStatus newStatus) {
-    if (isLegalTransition(this.driverStatus, newStatus)) {
-      this.driverStatus = newStatus;
+  private synchronized void setStatus(final DriverStatus toStatus) {
+    if (this.driverStatus.isLegalTransition(toStatus)) {
+      this.driverStatus = toStatus;
     } else {
-      LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new 
Object[] {this.driverStatus, newStatus});
+      LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new 
Object[] {this.driverStatus, toStatus});
     }
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index d4b8997..fc77380 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -25,6 +25,7 @@ import 
org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.exception.NonSerializableException;
+import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
 import 
org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
 import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
@@ -41,10 +42,6 @@ import org.apache.reef.io.naming.Identifiable;
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
 import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
 import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
 import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
@@ -111,17 +108,19 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
   // Mutable fields
   private Optional<TaskRepresenter> task = Optional.empty();
-  private boolean isResourceReleased = false;
-  private boolean allocationFired = false;
+  private boolean resourceNotReleased = true;
+  private boolean allocationNotFired = true;
 
   @Inject
   private EvaluatorManager(
+      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
+      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl 
evaluatorDescriptor,
+      @Parameter(EvaluatorConfigurationProviders.class)
+        final Set<ConfigurationProvider> evaluatorConfigurationProviders,
       final Clock clock,
       final RemoteManager remoteManager,
       final ResourceReleaseHandler resourceReleaseHandler,
       final ResourceLaunchHandler resourceLaunchHandler,
-      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
-      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl 
evaluatorDescriptor,
       final ContextRepresenters contextRepresenters,
       final ConfigurationSerializer configurationSerializer,
       final EvaluatorMessageDispatcher messageDispatcher,
@@ -131,18 +130,20 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       final ExceptionCodec exceptionCodec,
       final EventHandlerIdlenessSource idlenessSource,
       final LoggingScopeFactory loggingScopeFactory,
-      @Parameter(EvaluatorConfigurationProviders.class)
-      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
       final DriverRestartManager driverRestartManager,
       final EvaluatorIdlenessThreadPool idlenessThreadPool) {
-    this.contextRepresenters = contextRepresenters;
-    this.idlenessSource = idlenessSource;
+
     LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: 
{0}", evaluatorId);
+
+    this.evaluatorId = evaluatorId;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
+
     this.clock = clock;
+    this.contextRepresenters = contextRepresenters;
+    this.idlenessSource = idlenessSource;
     this.resourceReleaseHandler = resourceReleaseHandler;
     this.resourceLaunchHandler = resourceLaunchHandler;
-    this.evaluatorId = evaluatorId;
-    this.evaluatorDescriptor = evaluatorDescriptor;
 
     this.messageDispatcher = messageDispatcher;
     this.evaluatorControlHandler = evaluatorControlHandler;
@@ -153,7 +154,6 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     this.remoteManager = remoteManager;
     this.configurationSerializer = configurationSerializer;
     this.loggingScopeFactory = loggingScopeFactory;
-    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
     this.driverRestartManager = driverRestartManager;
     this.idlenessThreadPool = idlenessThreadPool;
 
@@ -182,28 +182,27 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
    */
   public synchronized void fireEvaluatorAllocatedEvent() {
-    if (!allocationFired && stateManager.isAllocated()) {
+
+    if (this.stateManager.isAllocated() && this.allocationNotFired) {
+
       final AllocatedEvaluator allocatedEvaluator =
           new AllocatedEvaluatorImpl(this,
-              remoteManager.getMyIdentifier(),
-              configurationSerializer,
+              this.remoteManager.getMyIdentifier(),
+              this.configurationSerializer,
               getJobIdentifier(),
-              loggingScopeFactory,
-              evaluatorConfigurationProviders);
-      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator 
with ID [{0}]", evaluatorId);
-      messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
-      allocationFired = true;
+              this.loggingScopeFactory,
+              this.evaluatorConfigurationProviders);
+
+      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator 
with ID [{0}]", this.evaluatorId);
+
+      this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+      this.allocationNotFired = false;
+
     } else {
-      LOG.log(Level.WARNING, "Evaluator allocated event fired more than 
once.");
+      LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once.");
     }
   }
 
-  private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent 
resourceStatusEvent) {
-    return resourceStatusEvent.getState() == State.DONE ||
-        resourceStatusEvent.getState() == State.FAILED ||
-        resourceStatusEvent.getState() == State.KILLED;
-  }
-
   @Override
   public String getId() {
     return this.evaluatorId;
@@ -219,58 +218,63 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
   @Override
   public void close() {
+
     synchronized (this.evaluatorDescriptor) {
-      if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+
+      if (this.stateManager.isAvailable()) {
+
         LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", 
getId());
+
         try {
-          if (this.stateManager.isRunning()){
+
+          if (this.stateManager.isRunning()) {
+
             // Killing the evaluator means that it doesn't need to send a 
confirmation; it just dies.
-            final EvaluatorRuntimeProtocol.EvaluatorControlProto 
evaluatorControlProto =
+            this.sendEvaluatorControlMessage(
                 EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
                     .setTimestamp(System.currentTimeMillis())
                     .setIdentifier(getId())
                     
.setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
-                    .build();
-            sendEvaluatorControlMessage(evaluatorControlProto);
+                    .build());
+
             this.stateManager.setClosing();
+
           } else {
             this.stateManager.setKilled();
           }
+
         } catch (Exception e) {
           LOG.log(Level.WARNING, "Exception occurred when manager sends 
killing message to task.", e);
           this.stateManager.setKilled();
         }
       }
 
-      if (!this.isResourceReleased) {
-        this.isResourceReleased = true;
+      if (this.resourceNotReleased) {
+
+        this.resourceNotReleased = false;
+
+        final ResourceReleaseEvent releaseEvent = 
ResourceReleaseEventImpl.newBuilder()
+            .setIdentifier(this.evaluatorId)
+            .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName())
+            .build();
+
         try {
-        /* We need to wait awhile before returning the container to the RM in 
order to
-         * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
+          // We need to wait awhile before returning the container to the RM
+          // in order to give the EvaluatorRuntime (and Launcher) time to 
cleanly exit.
           this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
             @Override
             public void onNext(final Alarm alarm) {
-              EvaluatorManager.this.resourceReleaseHandler.onNext(
-                  ResourceReleaseEventImpl.newBuilder()
-                      .setIdentifier(EvaluatorManager.this.evaluatorId)
-                      
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
-                      .build()
-              );
+              resourceReleaseHandler.onNext(releaseEvent);
             }
           });
         } catch (final IllegalStateException e) {
           LOG.log(Level.WARNING, "Force resource release because the client 
closed the clock.", e);
-          EvaluatorManager.this.resourceReleaseHandler.onNext(
-              ResourceReleaseEventImpl.newBuilder()
-                  .setIdentifier(EvaluatorManager.this.evaluatorId)
-                  
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
-                  .build()
-          );
+          this.resourceReleaseHandler.onNext(releaseEvent);
         }
       }
     }
 
-    idlenessThreadPool.runCheckAsync(this);
+    this.idlenessThreadPool.runCheckAsync(this);
   }
 
   /**
@@ -278,8 +282,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * <em>and</em> there are no messages queued or in processing.
    */
   public boolean isClosed() {
-    return this.messageDispatcher.isEmpty() &&
-        this.stateManager.isDoneOrFailedOrKilled();
+    return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted();
   }
 
   /**
@@ -310,9 +313,11 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    */
   public void onEvaluatorException(final EvaluatorException exception) {
     synchronized (this.evaluatorDescriptor) {
-      if (this.stateManager.isDoneOrFailedOrKilled()) {
-        LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} 
which is already in state {1}.",
-            new Object[]{this.getId(), this.stateManager});
+
+      if (this.stateManager.isCompleted()) {
+        LOG.log(Level.FINE,
+            "Ignoring an exception received for Evaluator {0} which is already 
in state {1}.",
+            new Object[] {this.getId(), this.stateManager});
         return;
       }
 
@@ -324,6 +329,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
         final Optional<FailedTask> failedTaskOptional;
         if (this.task.isPresent()) {
+
           final String taskId = this.task.get().getId();
           final Optional<ActiveContext> evaluatorContext = Optional.empty();
           final Optional<byte[]> bytes = Optional.empty();
@@ -332,13 +338,15 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           final Optional<String> description = Optional.empty();
           final FailedTask failedTask =
               new FailedTask(taskId, message, description, taskException, 
bytes, evaluatorContext);
+
           failedTaskOptional = Optional.of(failedTask);
+
         } else {
           failedTaskOptional = Optional.empty();
         }
 
-        final FailedEvaluator failedEvaluator = new 
FailedEvaluatorImpl(exception, failedContextList,
-            failedTaskOptional, this.evaluatorId);
+        final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(
+            exception, failedContextList, failedTaskOptional, 
this.evaluatorId);
 
         if 
(driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired())
 {
           
this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
@@ -362,18 +370,26 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
     final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto 
evaluatorHeartbeatProto =
         evaluatorHeartbeatProtoRemoteMessage.getMessage();
+
     LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
 
     synchronized (this.evaluatorDescriptor) {
-      if (this.stateManager.isDoneOrFailedOrKilled()) {
-        LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} 
which is already in state {1}.",
-            new Object[]{this.getId(), this.stateManager});
+
+      if (this.stateManager.isCompleted()) {
+
+        LOG.log(Level.FINE,
+            "Ignoring a heartbeat received for Evaluator {0} which is already 
in state {1}.",
+            new Object[] {this.getId(), this.stateManager});
+
         return;
-      } else if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
-        this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
+
+      } else if (this.stateManager.isAvailable()) {
+
+        this.sanityChecker.check(this.evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
         final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
 
-        final EvaluatorRestartState evaluatorRestartState = 
driverRestartManager.getEvaluatorRestartState(evaluatorId);
+        final EvaluatorRestartState evaluatorRestartState =
+            
this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId);
 
         /*
          * First message from a running evaluator. The evaluator can be a new 
evaluator or be a previous evaluator
@@ -396,7 +412,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
 
           if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
-            driverRestartManager.setEvaluatorReregistered(evaluatorId);
+            
this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId);
           }
         }
       }
@@ -406,8 +422,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
       // Process the Evaluator status message
       if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
-        EvaluatorStatusPOJO evaluatorStatus = new 
EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
-        this.onEvaluatorStatusMessage(evaluatorStatus);
+        this.onEvaluatorStatusMessage(new 
EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()));
       }
 
       // Process the Context status message(s)
@@ -417,14 +432,13 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         contextStatusList.add(new ContextStatusPOJO(proto, 
messageSequenceNumber));
       }
 
-      this.contextRepresenters.onContextStatusMessages(contextStatusList,
-          informClientOfNewContexts);
+      this.contextRepresenters.onContextStatusMessages(contextStatusList, 
informClientOfNewContexts);
 
       // Process the Task status message
       if (evaluatorHeartbeatProto.hasTaskStatus()) {
-        TaskStatusPOJO taskStatus = new 
TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber);
-        this.onTaskStatusMessage(taskStatus);
+        this.onTaskStatusMessage(new 
TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber));
       }
+
       LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", 
this.getId());
     }
   }
@@ -461,7 +475,9 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * @param message
    */
   private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) 
{
+
     assert message.getState() == State.DONE;
+
     LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
 
     // Send an ACK to the Evaluator.
@@ -474,7 +490,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
     this.stateManager.setDone();
     this.messageDispatcher.onEvaluatorCompleted(new 
CompletedEvaluatorImpl(this.evaluatorId));
-    close();
+
+    this.close();
   }
 
   /**
@@ -483,22 +500,24 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * @param evaluatorStatus
    */
   private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO 
evaluatorStatus) {
-    assert evaluatorStatus.getState()
-            == State.FAILED;
+
+    assert evaluatorStatus.getState() == State.FAILED;
+
     final EvaluatorException evaluatorException;
+
     if (evaluatorStatus.hasError()) {
+
       final Optional<Throwable> exception =
           this.exceptionCodec.fromBytes(evaluatorStatus.getError());
-      if (exception.isPresent()) {
-        evaluatorException = new EvaluatorException(getId(), exception.get());
-      } else {
-        evaluatorException = new EvaluatorException(getId(),
-            new NonSerializableException("Exception sent, but can't be 
deserialized", evaluatorStatus.getError()));
-      }
+
+      evaluatorException = new EvaluatorException(getId(), 
exception.isPresent() ? exception.get() :
+          new NonSerializableException("Exception sent, but can't be 
deserialized", evaluatorStatus.getError()));
+
     } else {
       evaluatorException = new EvaluatorException(getId(), new Exception("No 
exception sent"));
     }
-    onEvaluatorException(evaluatorException);
+
+    this.onEvaluatorException(evaluatorException);
   }
 
   /**
@@ -507,8 +526,10 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * @param message
    */
   private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO 
message) {
+
     assert message.getState() == State.KILLED;
-    assert stateManager.isClosing();
+    assert this.stateManager.isClosing();
+
     LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
 
     this.stateManager.setKilled();
@@ -519,9 +540,9 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       if (this.stateManager.isAllocated()) {
         this.stateManager.setSubmitted();
         this.resourceLaunchHandler.onNext(resourceLaunchEvent);
-      } else if (this.stateManager.isFailedOrKilled()) {
-        LOG.log(Level.WARNING, "Evaluator manager expected" + 
EvaluatorState.ALLOCATED +
-            " state but instead is in state " + this.stateManager);
+      } else if (this.stateManager.isCompletedAbnormally()) {
+        LOG.log(Level.WARNING, "Evaluator manager expected {0} state but 
instead is in state {1}",
+            new Object[] {EvaluatorState.ALLOCATED, this.stateManager});
       } else {
         throw new RuntimeException("Evaluator manager expected " + 
EvaluatorState.ALLOCATED +
             " state but instead is in state " + this.stateManager);
@@ -560,18 +581,17 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
 
     if (!(this.task.isPresent() && 
this.task.get().getId().equals(taskStatus.getTaskId()))) {
-      if (taskStatus.getState() == State.INIT ||
-          taskStatus.getState() == State.FAILED ||
-          taskStatus.getState() == State.RUNNING ||
-          driverRestartManager.getEvaluatorRestartState(evaluatorId) == 
EvaluatorRestartState.REREGISTERED) {
+
+      final State state = taskStatus.getState();
+      if (state.isRestartable() ||
+          
this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered())
 {
 
         // [REEF-308] exposes a bug where the .NET evaluator does not send its 
states in the right order
         // [REEF-289] is a related item which may fix the issue
-        if (taskStatus.getState() == State.RUNNING) {
+        if (state.isRunning()) {
           LOG.log(Level.WARNING,
-                  "Received a message of state " + 
ReefServiceProtos.State.RUNNING +
-                  " for Task " + taskStatus.getTaskId() +
-                  " before receiving its " + ReefServiceProtos.State.INIT + " 
state");
+              "Received a message of state {0} for Task {1} before receiving 
its {2} state",
+              new Object[] {State.RUNNING, taskStatus.getTaskId(), 
State.INIT});
         }
 
         // FAILED is a legal first state of a Task as it could have failed 
during construction.
@@ -583,11 +603,12 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
                 this.exceptionCodec,
                 this.driverRestartManager));
       } else {
-        throw new RuntimeException("Received a message of state " + 
taskStatus.getState() +
+        throw new RuntimeException("Received a message of state " + state +
             ", not INIT, RUNNING, or FAILED for Task " + 
taskStatus.getTaskId() +
             " which we haven't heard from before.");
       }
     }
+
     this.task.get().onTaskStatusMessage(taskStatus);
 
     if (this.task.get().isNotRunning()) {
@@ -600,20 +621,28 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    * Resource status information from the (actual) resource manager.
    */
   public void onResourceStatusMessage(final ResourceStatusEvent 
resourceStatusEvent) {
+
     synchronized (this.evaluatorDescriptor) {
-      LOG.log(Level.FINEST, "Resource manager state update: {0}", 
resourceStatusEvent.getState());
-      if (!this.stateManager.isAllocatedOrSubmittedOrRunning()) {
-        LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} 
which is already in state {1}.",
-            new Object[]{this.getId(), this.stateManager});
-      } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && 
this.stateManager.isAllocatedOrSubmittedOrRunning()) {
-        // something is wrong. The resource manager reports that the Evaluator 
is done or failed, but the Driver assumes
-        // it to be alive.
+
+      final State state = resourceStatusEvent.getState();
+      LOG.log(Level.FINEST, "Resource manager state update: {0}", state);
+
+      if (!this.stateManager.isAvailable()) {
+
+        LOG.log(Level.FINE,
+            "Ignoring resource status update for Evaluator {0} which is 
already in state {1}.",
+            new Object[] {this.getId(), this.stateManager});
+
+      } else if (state.isCompleted() && this.stateManager.isAvailable()) {
+
+        // Something is wrong. The resource manager reports that the Evaluator 
is done or failed,
+        // but the Driver assumes it to be alive.
         final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
             .append(this.evaluatorId)
             .append("] is assumed to be in state [")
             .append(this.stateManager.toString())
             .append("]. But the resource manager reports it to be in state [")
-            .append(resourceStatusEvent.getState())
+            .append(state)
             .append("].");
 
         if (this.stateManager.isSubmitted()) {
@@ -626,6 +655,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           messageBuilder.append(" This means that the Evaluator failed but 
wasn't able to send an error message " +
               "back to the driver.");
         }
+
         if (this.task.isPresent()) {
           messageBuilder.append(" Task [")
               .append(this.task.get().getId())
@@ -633,8 +663,8 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         }
 
         if (resourceStatusEvent.getState() == State.KILLED) {
-          this.onEvaluatorException(new 
EvaluatorKilledByResourceManagerException(this.evaluatorId,
-              messageBuilder.toString()));
+          this.onEvaluatorException(
+              new EvaluatorKilledByResourceManagerException(this.evaluatorId, 
messageBuilder.toString()));
         } else {
           this.onEvaluatorException(new EvaluatorException(this.evaluatorId, 
messageBuilder.toString()));
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
index 653486e..6ee80a8 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -22,17 +22,155 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
 /**
- * Various states that the EvaluatorManager could be in. The EvaluatorManager 
is
- * created when a resource has been allocated by the ResourceManager.
+ * Various states that the EvaluatorManager could be in.
+ * The EvaluatorManager is created when a resource has been allocated by the 
ResourceManager.
  */
 @DriverSide
 @Private
 enum EvaluatorState {
-  ALLOCATED,  // initial state
-  SUBMITTED,  // client called AllocatedEvaluator.submitTask() and we're 
waiting for first contact
-  RUNNING,    // first contact received, all communication channels 
established, Evaluator sent to client.
-  CLOSING,    // evaluator is asked shutdown, but not closed yet.
-  DONE,       // clean shutdown
-  FAILED,     // some failure occurred.
-  KILLED      // unclean shutdown
+
+  /** Initial state. */
+  ALLOCATED,
+
+  /** Client called AllocatedEvaluator.submitTask() and we're waiting for 
first contact. */
+  SUBMITTED,
+
+  /** First contact received, all communication channels established, 
Evaluator sent to client. */
+  RUNNING,
+
+  /** Evaluator is asked to shut down, but has not closed yet. */
+  CLOSING,
+
+  /** Clean shutdown. */
+  DONE,
+
+  /** Some failure occurred. */
+  FAILED,
+
+  /** Unclean shutdown. */
+  KILLED;
+
+  /**
+   * Check if evaluator is in the initial state (ALLOCATED).
+   * @return true if ALLOCATED, false otherwise.
+   */
+  public final boolean isAllocated() {
+    return this == ALLOCATED;
+  }
+
+  /**
+   * Check if evaluator is in SUBMITTED state.
+   * @return true if SUBMITTED, false otherwise.
+   */
+  public final boolean isSubmitted() {
+    return this == SUBMITTED;
+  }
+
+  /**
+   * Check if the evaluator is in running state.
+   * @return true if RUNNING, false otherwise.
+   */
+  public final boolean isRunning() {
+    return this == RUNNING;
+  }
+
+  /**
+   * Check if the evaluator is in the process of being shut down.
+   * @return true if evaluator is being closed, false otherwise.
+   */
+  public final boolean isClosing() {
+    return this == CLOSING;
+  }
+
+  /**
+   * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, 
or RUNNING).
+   * @return true if evaluator is available, false if it is closed or in the 
process of being shut down.
+   */
+  public final boolean isAvailable() {
+    return this == ALLOCATED || this == SUBMITTED || this == RUNNING;
+  }
+
+  /**
+   * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, 
or KILLED states.
+   * @return true if evaluator completed, false if it is still available or in 
the process of being shut down.
+   */
+  public final boolean isCompleted() {
+    return this == DONE || this == FAILED || this == KILLED;
+  }
+
+  /**
+   * Check if the evaluator is closed due to an error. That is, in FAILED or 
KILLED state.
+   * @return true if evaluator is stopped due to an error, true otherwise.
+   */
+  public final boolean isCompletedAbnormally() {
+    return this == FAILED || this == KILLED;
+  }
+
+  /**
+   * Check if transition from current state to the given one is legal.
+   * @param toState new state to transition to.
+   * @return true if transition is legal, false otherwise.
+   */
+  public final boolean isLegalTransition(final EvaluatorState toState) {
+
+    if (this == toState) {
+      return true;
+    }
+
+    switch(this) {
+
+    case ALLOCATED:
+      switch(toState) {
+      case SUBMITTED:
+      case CLOSING:
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case SUBMITTED:
+      switch(toState) {
+      case RUNNING:
+      case CLOSING:
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case RUNNING:
+      switch(toState) {
+      case CLOSING:
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case CLOSING:
+      switch(toState) {
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case DONE:
+    case FAILED:
+    case KILLED:
+      return false;
+
+    default:
+      throw new RuntimeException("Unknown state: " + this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
index a2b249a..bb78dbe 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
 import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,160 +32,159 @@ import java.util.logging.Logger;
 @DriverSide
 @Private
 final class EvaluatorStatusManager {
+
   private static final Logger LOG = 
Logger.getLogger(EvaluatorStatusManager.class.getName());
+
   /**
    * The state managed.
    */
-  private EvaluatorState state = EvaluatorState.ALLOCATED;
+  private final AtomicReference<EvaluatorState> state = new 
AtomicReference<>(EvaluatorState.ALLOCATED);
 
   @Inject
   private EvaluatorStatusManager() {
     LOG.log(Level.FINE, "Instantiated 'EvaluatorStatusManager'");
   }
 
-  private static boolean isLegal(final EvaluatorState from, final 
EvaluatorState to) {
-    if (from == to) {
-      return true;
-    }
-
-    switch(from) {
-    case ALLOCATED: {
-      switch(to) {
-      case SUBMITTED:
-      case DONE:
-      case CLOSING:
-      case FAILED:
-        return true;
-      case KILLED:
-      case RUNNING:
-        break;
-      default:
-        throw new RuntimeException("Unknown state: " + to);
-      }
-    }
-    case SUBMITTED: {
-      switch(to) {
-      case RUNNING:
-      case DONE:
-      case CLOSING:
-      case FAILED:
-        return true;
-      case ALLOCATED:
-      case KILLED:
-        break;
-      default:
-        throw new RuntimeException("Unknown state: " + to);
-      }
-    }
-    case RUNNING: {
-      switch(to) {
-      case DONE:
-      case CLOSING:
-      case FAILED:
-        return true;
-      case ALLOCATED:
-      case SUBMITTED:
-      case KILLED:
-        break;
-      default:
-        throw new RuntimeException("Unknown state: " + to);
-      }
-    }
-    case CLOSING: {
-      switch(to) {
-      case KILLED:
-      case DONE:
-      case FAILED:
-        return true;
-      case ALLOCATED:
-      case SUBMITTED:
-      case RUNNING:
-        break;
-      default:
-        throw new RuntimeException("Unknown state: " + to);
-      }
-    }
-    case DONE:
-    case FAILED:
-    case KILLED:
-      break;
-    default:
-      throw new RuntimeException("Unknown state: " + from);
-    }
-
-    LOG.warning("Illegal evaluator state transition from " + from + " to " + 
to + ".");
-    return false;
-  }
-
-  private static boolean isDoneOrFailedOrKilled(final EvaluatorState state) {
-    return state == EvaluatorState.DONE ||
-           state == EvaluatorState.FAILED ||
-           state == EvaluatorState.KILLED;
-  }
-
-  synchronized void setRunning() {
+  void setRunning() {
     this.setState(EvaluatorState.RUNNING);
   }
 
-  synchronized void setSubmitted() {
+  void setSubmitted() {
     this.setState(EvaluatorState.SUBMITTED);
   }
 
-  synchronized void setClosing() {
+  void setClosing() {
     this.setState(EvaluatorState.CLOSING);
   }
 
-  synchronized void setDone() {
+  void setDone() {
     this.setState(EvaluatorState.DONE);
   }
 
-  synchronized void setFailed() {
+  void setFailed() {
     this.setState(EvaluatorState.FAILED);
   }
 
-  synchronized void setKilled() {
+  void setKilled() {
     this.setState(EvaluatorState.KILLED);
   }
 
-  synchronized boolean isRunning() {
-    return this.state.equals(EvaluatorState.RUNNING);
+  /**
+   * Check if evaluator is in the initial state (ALLOCATED).
+   * @return true if allocated, false otherwise.
+   */
+  boolean isAllocated() {
+    return this.state.get().isAllocated();
+  }
+
+  /**
+   * Check if evaluator is in SUBMITTED state.
+   * @return true if submitted, false otherwise.
+   */
+  boolean isSubmitted() {
+    return this.state.get().isSubmitted();
+  }
+
+  /**
+   * Check if the evaluator is in running state.
+   * @return true if RUNNING, false otherwise.
+   */
+  boolean isRunning() {
+    return this.state.get().isRunning();
   }
 
-  synchronized boolean isDoneOrFailedOrKilled() {
-    return isDoneOrFailedOrKilled(this.state);
+  /**
+   * Check if the evaluator is in the process of being shut down.
+   * @return true if evaluator is being closed, false otherwise.
+   */
+  boolean isClosing() {
+    return this.state.get().isClosing();
   }
 
-  synchronized boolean isAllocatedOrSubmittedOrRunning() {
-    return this.state == EvaluatorState.ALLOCATED ||
-           this.state == EvaluatorState.SUBMITTED ||
-           this.state == EvaluatorState.RUNNING;
+  /**
+   * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, 
or RUNNING).
+   * @return true if evaluator is available, false if it is closed or in the 
process of being shut down.
+   * @deprecated TODO[JIRA REEF-1560] Use isAvailable() method instead. Remove 
after version 0.16
+   */
+  @Deprecated
+  boolean isAllocatedOrSubmittedOrRunning() {
+    return this.state.get().isAvailable();
   }
 
-  synchronized boolean isSubmitted() {
-    return EvaluatorState.SUBMITTED == this.state;
+  /**
+   * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, 
or RUNNING).
+   * @return true if evaluator is available, false if it is closed or in the 
process of being shut down.
+   */
+  boolean isAvailable() {
+    return this.state.get().isAvailable();
   }
 
-  synchronized boolean isAllocated() {
-    return EvaluatorState.ALLOCATED == this.state;
+  /**
+   * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, 
or KILLED states.
+   * @return true if evaluator completed, false if it is still available or in 
the process of being shut down.
+   * @deprecated TODO[JIRA REEF-1560] Use isCompleted() method instead. Remove 
after version 0.16
+   */
+  @Deprecated
+  boolean isDoneOrFailedOrKilled() {
+    return this.state.get().isCompleted();
   }
 
-  synchronized boolean isFailedOrKilled() {
-    return EvaluatorState.FAILED == this.state || EvaluatorState.KILLED == 
this.state;
+  /**
+   * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, 
or KILLED states.
+   * @return true if evaluator completed, false if it is still available or in 
the process of being shut down.
+   */
+  boolean isCompleted() {
+    return this.state.get().isCompleted();
+  }
+
+  /**
+   * Check if the evaluator is closed due to an error. That is, in FAILED or 
KILLED state.
+   * @return true if evaluator is stopped due to an error, true otherwise.
+   * @deprecated TODO[JIRA REEF-1560] Use isCompletedAbnormally() method 
instead. Remove after version 0.16
+   */
+  @Deprecated
+  boolean isFailedOrKilled() {
+    return this.state.get().isCompletedAbnormally();
   }
 
-  synchronized boolean isClosing() {
-    return EvaluatorState.CLOSING == this.state;
+  /**
+   * Check if the evaluator is closed due to an error. That is, in FAILED or 
KILLED state.
+   * @return true if evaluator is stopped due to an error, true otherwise.
+   */
+  boolean isCompletedAbnormally() {
+    return this.state.get().isCompletedAbnormally();
   }
 
+  /**
+   * Return string representation of the current state of hte Evaluator, like 
RUNNING or DONE.
+   * @return string representation of the current state of the Evaluator.
+   */
   @Override
-  public synchronized String toString() {
-    return this.state.toString();
+  public String toString() {
+    return this.state.get().toString();
   }
 
-  private synchronized void setState(final EvaluatorState state) {
-    if (!isLegal(this.state, state)) {
-      throw new IllegalStateException("Illegal state transition from '" + 
this.state + "' to '" + state + "'");
+  /**
+   * Transition to the new state of the evaluator, if possible.
+   * @param toState New state of the evaluator.
+   * @throws IllegalStateException if state transition is not valid.
+   */
+  private void setState(final EvaluatorState toState) {
+    while (true) {
+
+      final EvaluatorState fromState = this.state.get();
+      if (fromState == toState) {
+        break;
+      }
+
+      if (!fromState.isLegalTransition(toState)) {
+        LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new 
Object[] {fromState, toState});
+        throw new IllegalStateException("Illegal state transition: " + 
fromState + " -> " + toState);
+      }
+
+      if (this.state.compareAndSet(fromState, toState)) {
+        break;
+      }
     }
-    this.state = state;
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
index 661369c..0eff9bd 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
@@ -35,55 +35,31 @@ public final class EvaluatorStatusPOJO {
   private final State evaluatorState;
   private final byte[] errorBytes;
 
-
   public EvaluatorStatusPOJO(final ReefServiceProtos.EvaluatorStatusProto 
proto) {
-
-    evaluatorID = proto.getEvaluatorId();
-    evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray();
-    evaluatorState = proto.hasState()? getStateFromProto(proto.getState()) : 
null;
-    errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
-
+    this.evaluatorID = proto.getEvaluatorId();
+    this.evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray();
+    this.evaluatorState = proto.hasState() ? State.fromProto(proto.getState()) 
: null;
+    this.errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
   }
 
   /**
-   * @return true, if an evaluator has thrown an exception and sent it to a 
driver
+   * @return true, if an evaluator has thrown an exception and sent it to a 
driver.
    */
   public boolean hasError() {
-    return null != errorBytes;
+    return null != this.errorBytes;
   }
 
   /**
-   * @return serialized exception thrown by an evaluator
+   * @return serialized exception thrown by an evaluator.
    */
-  public byte[] getError(){
-    return errorBytes;
+  public byte[] getError() {
+    return this.errorBytes;
   }
 
   /**
-   * @return current {@link 
org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+   * @return current state of a task.
    */
-  public State getState(){
-    return evaluatorState;
+  public State getState() {
+    return this.evaluatorState;
   }
-
-  private State getStateFromProto(final 
org.apache.reef.proto.ReefServiceProtos.State protoState) {
-
-    switch (protoState) {
-    case INIT:
-      return State.INIT;
-    case RUNNING:
-      return State.RUNNING;
-    case DONE:
-      return State.DONE;
-    case SUSPEND:
-      return State.SUSPEND;
-    case FAILED:
-      return State.FAILED;
-    case KILLED:
-      return State.KILLED;
-    default:
-      throw new IllegalStateException("Unknown state " + protoState + " in 
EvaluatorStatusProto");
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
index 477564c..6ea7cb1 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
@@ -21,20 +21,127 @@ package 
org.apache.reef.runtime.common.driver.evaluator.pojos;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
 
 /**
  * DriverSide representation of ReefServiceProtos.State.
  */
-
 @DriverSide
 @Private
 public enum State {
 
-    INIT,
-    RUNNING,
-    DONE,
-    SUSPEND,
-    FAILED,
-    KILLED;
+  INIT,
+  RUNNING,
+  SUSPEND,
+  DONE,
+  FAILED,
+  KILLED;
+
+  /**
+   * Get a driver-side state given the proto. It is a 1:1 mapping.
+   * @param protoState remote state from the proto.
+   * @return a corresponding (identical) driver-side state (always a 1:1 
mapping).
+   */
+  public static State fromProto(final ReefServiceProtos.State protoState) {
+    switch (protoState) {
+    case INIT:
+      return INIT;
+    case RUNNING:
+      return RUNNING;
+    case SUSPEND:
+      return SUSPEND;
+    case DONE:
+      return DONE;
+    case FAILED:
+      return FAILED;
+    case KILLED:
+      return KILLED;
+    default:
+      throw new IllegalStateException("Unknown state " + protoState + " in 
EvaluatorStatusProto");
+    }
+  }
+
+  /**
+   * Checks if the ResourceManager can switch from the current state to the 
target state.
+   * See REEF-826 for the state transition matrix.
+   * @param toState state to switch to.
+   * @return true if the transition is legal; false otherwise.
+   */
+  public final boolean isLegalTransition(final State toState) {
+
+    if (this == toState) {
+      return true;
+    }
+
+    switch (this) {
+
+    case INIT:
+      switch (toState) {
+      case RUNNING:
+      case SUSPEND:
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case RUNNING:
+      switch (toState) {
+      case SUSPEND:
+      case DONE:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    case SUSPEND:
+      switch (toState) {
+      case RUNNING:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+      }
+
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * Check if container is in RUNNING state.
+   * @return true if container is running.
+   */
+  public final boolean isRunning() {
+    return this == RUNNING;
+  }
+
+  /**
+   * Check if container is available - that is, in one of the states INIT, 
RUNNING, or SUSPEND.
+   * @return true if container is available, false if it is closed or in the 
process of being shut down.
+   */
+  public final boolean isAvailable() {
+    return this == INIT || this == RUNNING || this == SUSPEND;
+  }
+
+  /**
+   * Check if the container is stopped. That is, in one of the DONE, FAILED, 
or KILLED states.
+   * @return true if the container is completed, false if it is still 
available or suspended.
+   */
+  public final boolean isCompleted() {
+    return this == DONE || this == FAILED || this == KILLED;
+  }
 
+  /**
+   * Check if the container is can be restarted. That is, in one of the INIT, 
RUNNING, or FAILED states.
+   * @return true if the container can be restarted.
+   */
+  public final boolean isRestartable() {
+    return this == INIT || this == RUNNING || this == FAILED;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
index f2e3f2d..3eb4e23 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
@@ -40,79 +40,57 @@ public final class TaskStatusPOJO {
   private final byte[] result;
   private final List<TaskMessagePOJO> taskMessages = new ArrayList<>();
 
-  public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final 
long sequenceNumber){
+  public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final 
long sequenceNumber) {
 
-    taskId = proto.getTaskId();
-    contextId = proto.getContextId();
-    state = proto.hasState()? getStateFromProto(proto.getState()) : null;
-    result = proto.hasResult() ? proto.getResult().toByteArray() : null;
+    this.taskId = proto.getTaskId();
+    this.contextId = proto.getContextId();
+    this.state = proto.hasState() ? State.fromProto(proto.getState()) : null;
+    this.result = proto.hasResult() ? proto.getResult().toByteArray() : null;
 
     for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) 
{
-      taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber));
+      this.taskMessages.add(new TaskMessagePOJO(taskMessageProto, 
sequenceNumber));
     }
-
   }
 
   /**
-   * @return a list of messages sent by a task
+   * @return a list of messages sent by a task.
    */
-  public List<TaskMessagePOJO> getTaskMessageList(){
-    return taskMessages;
+  public List<TaskMessagePOJO> getTaskMessageList() {
+    return this.taskMessages;
   }
 
   /**
-   * @return true, if a completed task returned a non-null value in the 
'return' statement
+   * @return true, if a completed task returned a non-null value in the 
'return' statement.
    */
-  public boolean hasResult(){
-    return null != result;
+  public boolean hasResult() {
+    return null != this.result;
   }
 
   /**
-   * @return serialized result that a completed task returned to the Driver
+   * @return serialized result that a completed task returned to the Driver.
    */
-  public byte[] getResult(){
-    return result;
+  public byte[] getResult() {
+    return this.result;
   }
 
   /**
-   * @return the id of a task
+   * @return the id of a task.
    */
-  public String getTaskId(){
-    return taskId;
+  public String getTaskId() {
+    return this.taskId;
   }
 
   /**
-   * @return the id of a context that this task runs within
+   * @return the id of a context that this task runs within.
    */
-  public String getContextId(){
-    return contextId;
+  public String getContextId() {
+    return this.contextId;
   }
 
   /**
-   * @return current {@link 
org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+   * @return current state of a task.
    */
-  public State getState(){
-    return state;
-  }
-
-  private State getStateFromProto(final 
org.apache.reef.proto.ReefServiceProtos.State protoState) {
-
-    switch (protoState) {
-    case INIT:
-      return State.INIT;
-    case RUNNING:
-      return State.RUNNING;
-    case DONE:
-      return State.DONE;
-    case SUSPEND:
-      return State.SUSPEND;
-    case FAILED:
-      return State.FAILED;
-    case KILLED:
-      return State.KILLED;
-    default:
-      throw new IllegalStateException("Unknown state " + protoState + " in 
EvaluatorStatusProto");
-    }
-
+  public State getState() {
+    return this.state;
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
index 285a1b1..2792790 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -56,7 +56,7 @@ public final class DriverIdleManager {
 
     final DriverStatusManager driverStatusManagerImpl = 
this.driverStatusManager.get();
 
-    if (driverStatusManagerImpl.isShuttingDownOrFailing()) {
+    if (driverStatusManagerImpl.isClosing()) {
       LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason 
[{1}]",
           new Object[] {reason.getComponentName(), reason.getReason()});
       return;

http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index f9a526d..4b19330 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -154,73 +154,15 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
     }
   }
 
-  /**
-  * Checks if the ResourceManager can switch from the current state to the 
target state.
-  * See REEF-826 for the state transition matrix.
-  * @param from current state.
-  * @param to state to switch to.
-  * @return true if the transition is legal; false otherwise.
-  */
-  private static boolean isLegalStateTransition(final State from, final State 
to) {
-
-    // handle diagonal elements of the transition matrix
-    if (from.equals(to)) {
-      LOG.log(Level.FINEST, "Transition from {0} state to the same state.", 
from);
-      return true;
-    }
-
-    // handle non-diagonal elements
-    switch (from) {
-
-    case INIT:
-      switch (to) {
-      case RUNNING:
-      case SUSPEND:
-      case DONE:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-      }
-
-    case RUNNING:
-      switch (to) {
-      case SUSPEND:
-      case DONE:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-      }
-
-    case SUSPEND:
-      switch (to) {
-      case RUNNING:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-      }
-
-    case DONE:
-    case FAILED:
-    case KILLED:
-      return false;
-
-    default:
-      return false;
-    }
-  }
-
-  private synchronized void setState(final State newState) {
-    if (isLegalStateTransition(this.state, newState)) {
-      this.state = newState;
+  private synchronized void setState(final State toState) {
+    if (this.state == toState) {
+      LOG.log(Level.FINE, "Transition from {0} state to the same state.", 
this.state);
+    } else if (this.state.isLegalTransition(toState)) {
+      LOG.log(Level.FINEST, "State transition: {0} -> {1}", new State[] 
{this.state, toState});
+      this.state = toState;
     } else {
       throw new IllegalStateException(
-          "Resource manager attempts illegal state transition from " + 
this.state + " to " + newState);
+          "Resource manager attempts illegal state transition from " + 
this.state + " to " + toState);
     }
   }
 }

Reply via email to