Repository: reef
Updated Branches:
  refs/heads/master d25412e9c -> 8c0856114


[REEF-832] Add CLOSING state to EvaluatorState

This addressed the issue by

  * When EvaluatorManager close a running evaluator, it turn into CLOSING state
    instead of KILLED.
  * Adding ACK logic from EvaluatorRuntime to EvaluatorManager during killing.
  * After EvaluatorManager ACK this message, it turn into KILLED state.

JIRA:
  [REEF-832](https://issues.apache.org/jira/browse/REEF-832)

Pull Request:
  This closes #1077


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8c085611
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8c085611
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8c085611

Branch: refs/heads/master
Commit: 8c08561146ed5c1c22927ff7cc278dac966e85c9
Parents: d25412e
Author: sanha <[email protected]>
Authored: Thu Aug 18 12:30:24 2016 +0900
Committer: Markus Weimer <[email protected]>
Committed: Thu Sep 8 11:39:41 2016 -0700

----------------------------------------------------------------------
 .../apache/reef/driver/task/RunningTask.java    |   7 ++
 .../driver/evaluator/EvaluatorManager.java      | 112 ++++++++++++-------
 .../common/driver/evaluator/EvaluatorState.java |   2 +-
 .../evaluator/EvaluatorStatusManager.java       |  29 ++++-
 .../common/driver/evaluator/Evaluators.java     |   2 +-
 .../common/driver/task/RunningTaskImpl.java     |   4 +
 .../common/driver/task/TaskRepresenter.java     |  16 +++
 .../common/evaluator/EvaluatorRuntime.java      |   1 +
 .../org/apache/reef/tests/AllTestsSuite.java    |   2 +
 .../evaluatorclose/EvaluatorCloseDriver.java    |  85 ++++++++++++++
 .../evaluatorclose/EvaluatorCloseTest.java      |  75 +++++++++++++
 .../evaluatorclose/EvaluatorCloseTestTask.java  |  40 +++++++
 .../reef/tests/evaluatorclose/package-info.java |  22 ++++
 .../EvaluatorFailureDuringAlarmDriver.java      |   1 -
 14 files changed, 353 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
index d406d80..1453cf7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
 
 /**
  * Represents a running Task.
@@ -70,4 +71,10 @@ public interface RunningTask extends Identifiable, 
AutoCloseable {
    */
   @Override
   void close();
+
+  /**
+   * Gets the representer of task.
+   * @return the representer of task
+   */
+  TaskRepresenter getTaskRepresenter();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index c461a5b..d4b8997 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -232,13 +232,16 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
                     
.setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
                     .build();
             sendEvaluatorControlMessage(evaluatorControlProto);
+            this.stateManager.setClosing();
+          } else {
+            this.stateManager.setKilled();
           }
-        } finally {
+        } catch (Exception e) {
+          LOG.log(Level.WARNING, "Exception occurred when manager sends 
killing message to task.", e);
           this.stateManager.setKilled();
         }
       }
 
-
       if (!this.isResourceReleased) {
         this.isResourceReleased = true;
         try {
@@ -248,20 +251,20 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
             @Override
             public void onNext(final Alarm alarm) {
               EvaluatorManager.this.resourceReleaseHandler.onNext(
-                      ResourceReleaseEventImpl.newBuilder()
-                              .setIdentifier(EvaluatorManager.this.evaluatorId)
-                              
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
-                              .build()
+                  ResourceReleaseEventImpl.newBuilder()
+                      .setIdentifier(EvaluatorManager.this.evaluatorId)
+                      
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
+                      .build()
               );
             }
           });
         } catch (final IllegalStateException e) {
           LOG.log(Level.WARNING, "Force resource release because the client 
closed the clock.", e);
           EvaluatorManager.this.resourceReleaseHandler.onNext(
-                  ResourceReleaseEventImpl.newBuilder()
-                          .setIdentifier(EvaluatorManager.this.evaluatorId)
-                          
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
-                          .build()
+              ResourceReleaseEventImpl.newBuilder()
+                  .setIdentifier(EvaluatorManager.this.evaluatorId)
+                  
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
+                  .build()
           );
         }
       }
@@ -276,7 +279,21 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
    */
   public boolean isClosed() {
     return this.messageDispatcher.isEmpty() &&
-           this.stateManager.isDoneOrFailedOrKilled();
+        this.stateManager.isDoneOrFailedOrKilled();
+  }
+
+  /**
+   * Return true if the state is CLOSING.
+   */
+  public boolean isClosing() {
+    return this.stateManager.isClosing();
+  }
+
+  /**
+   * Return true if the state is DONE, FAILED, KILLED, or CLOSING.
+   */
+  public boolean isClosedOrClosing() {
+    return isClosed() || isClosing();
   }
 
   /**
@@ -352,35 +369,35 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} 
which is already in state {1}.",
             new Object[]{this.getId(), this.stateManager});
         return;
-      }
-
-      this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
-      final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
-
-      final EvaluatorRestartState evaluatorRestartState = 
driverRestartManager.getEvaluatorRestartState(evaluatorId);
-
-      /*
-       * First message from a running evaluator. The evaluator can be a new 
evaluator or be a previous evaluator
-       * from a separate application attempt. In the case of a previous 
evaluator, if the restart period has not
-       * yet expired, we should register it and trigger context active and 
task events. If the restart period has
-       * expired, we should return immediately after setting its remote ID in 
order to close it.
-       */
-      if (this.stateManager.isSubmitted() ||
-          evaluatorRestartState == EvaluatorRestartState.REPORTED ||
-          evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
-
-        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-
-        if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
-          // Don't do anything if evaluator has expired. Close it immediately 
upon exit of this method.
-          return;
-        }
+      } else if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+        this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
+        final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
+
+        final EvaluatorRestartState evaluatorRestartState = 
driverRestartManager.getEvaluatorRestartState(evaluatorId);
+
+        /*
+         * First message from a running evaluator. The evaluator can be a new 
evaluator or be a previous evaluator
+         * from a separate application attempt. In the case of a previous 
evaluator, if the restart period has not
+         * yet expired, we should register it and trigger context active and 
task events. If the restart period has
+         * expired, we should return immediately after setting its remote ID 
in order to close it.
+         */
+        if (this.stateManager.isSubmitted() ||
+            evaluatorRestartState == EvaluatorRestartState.REPORTED ||
+            evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
+
+          this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+
+          if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
+            // Don't do anything if evaluator has expired. Close it 
immediately upon exit of this method.
+            return;
+          }
 
-        this.stateManager.setRunning();
-        LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+          this.stateManager.setRunning();
+          LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
 
-        if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
-          driverRestartManager.setEvaluatorReregistered(evaluatorId);
+          if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
+            driverRestartManager.setEvaluatorReregistered(evaluatorId);
+          }
         }
       }
 
@@ -426,8 +443,10 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     case FAILED:
       this.onEvaluatorFailed(message);
       break;
-    case INIT:
     case KILLED:
+      this.onEvaluatorKilled(message);
+      break;
+    case INIT:
     case RUNNING:
     case SUSPEND:
       break;
@@ -482,6 +501,19 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     onEvaluatorException(evaluatorException);
   }
 
+  /**
+   * Process an evaluator message that indicates that the evaluator completed 
the unclean shut down request.
+   *
+   * @param message
+   */
+  private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO 
message) {
+    assert message.getState() == State.KILLED;
+    assert stateManager.isClosing();
+    LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
+
+    this.stateManager.setKilled();
+  }
+
   public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
     synchronized (this.evaluatorDescriptor) {
       if (this.stateManager.isAllocated()) {
@@ -570,7 +602,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   public void onResourceStatusMessage(final ResourceStatusEvent 
resourceStatusEvent) {
     synchronized (this.evaluatorDescriptor) {
       LOG.log(Level.FINEST, "Resource manager state update: {0}", 
resourceStatusEvent.getState());
-      if (this.stateManager.isDoneOrFailedOrKilled()) {
+      if (!this.stateManager.isAllocatedOrSubmittedOrRunning()) {
         LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} 
which is already in state {1}.",
             new Object[]{this.getId(), this.stateManager});
       } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && 
this.stateManager.isAllocatedOrSubmittedOrRunning()) {

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
index 6d9a26c..653486e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -31,7 +31,7 @@ enum EvaluatorState {
   ALLOCATED,  // initial state
   SUBMITTED,  // client called AllocatedEvaluator.submitTask() and we're 
waiting for first contact
   RUNNING,    // first contact received, all communication channels 
established, Evaluator sent to client.
-  // TODO[JIRA REEF-832]: Add CLOSING state
+  CLOSING,    // evaluator is asked shutdown, but not closed yet.
   DONE,       // clean shutdown
   FAILED,     // some failure occurred.
   KILLED      // unclean shutdown

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
index 5d5d1f3..a2b249a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -52,9 +52,10 @@ final class EvaluatorStatusManager {
       switch(to) {
       case SUBMITTED:
       case DONE:
+      case CLOSING:
       case FAILED:
-      case KILLED:
         return true;
+      case KILLED:
       case RUNNING:
         break;
       default:
@@ -65,10 +66,11 @@ final class EvaluatorStatusManager {
       switch(to) {
       case RUNNING:
       case DONE:
+      case CLOSING:
       case FAILED:
-      case KILLED:
         return true;
       case ALLOCATED:
+      case KILLED:
         break;
       default:
         throw new RuntimeException("Unknown state: " + to);
@@ -77,11 +79,26 @@ final class EvaluatorStatusManager {
     case RUNNING: {
       switch(to) {
       case DONE:
+      case CLOSING:
       case FAILED:
+        return true;
+      case ALLOCATED:
+      case SUBMITTED:
       case KILLED:
+        break;
+      default:
+        throw new RuntimeException("Unknown state: " + to);
+      }
+    }
+    case CLOSING: {
+      switch(to) {
+      case KILLED:
+      case DONE:
+      case FAILED:
         return true;
       case ALLOCATED:
       case SUBMITTED:
+      case RUNNING:
         break;
       default:
         throw new RuntimeException("Unknown state: " + to);
@@ -113,6 +130,10 @@ final class EvaluatorStatusManager {
     this.setState(EvaluatorState.SUBMITTED);
   }
 
+  synchronized void setClosing() {
+    this.setState(EvaluatorState.CLOSING);
+  }
+
   synchronized void setDone() {
     this.setState(EvaluatorState.DONE);
   }
@@ -151,6 +172,10 @@ final class EvaluatorStatusManager {
     return EvaluatorState.FAILED == this.state || EvaluatorState.KILLED == 
this.state;
   }
 
+  synchronized boolean isClosing() {
+    return EvaluatorState.CLOSING == this.state;
+  }
+
   @Override
   public synchronized String toString() {
     return this.state.toString();

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 5bc0645..fac1ab4 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -67,7 +67,7 @@ public final class Evaluators implements AutoCloseable {
     }
     for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
       LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", 
evaluatorManager.getId());
-      if (!evaluatorManager.isClosed()) {
+      if (!evaluatorManager.isClosedOrClosing()) {
         evaluatorManager.close();
       }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
index e00579d..dfa20cb 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
@@ -138,4 +138,8 @@ public final class RunningTaskImpl implements RunningTask {
   public String toString() {
     return "RunningTask{taskId='" + taskId + "'}";
   }
+
+  public TaskRepresenter getTaskRepresenter() {
+    return taskRepresenter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 54fce7e..1c51870 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -220,4 +220,20 @@ public final class TaskRepresenter {
         new Object[]{this.taskId, this.state, newState});
     this.state = newState;
   }
+
+  /**
+   * Check whether this evaluator is in closing state.
+   * @return whether this evaluator is in closing state.
+   */
+  public boolean evaluatorIsClosing() {
+    return evaluatorManager.isClosing();
+  }
+
+  /**
+   * Check whether this evaluator is in closed state.
+   * @return whether this evaluator is in closed state.
+   */
+  public boolean evaluatorIsClosed() {
+    return evaluatorManager.isClosed();
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
index 1384556..74c4f38 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
@@ -130,6 +130,7 @@ final class EvaluatorRuntime implements 
EventHandler<EvaluatorControlProto> {
         if (message.hasKillEvaluator()) {
           LOG.log(Level.SEVERE, "Evaluator {0} has been killed by the 
driver.", this.evaluatorIdentifier);
           this.state = ReefServiceProtos.State.KILLED;
+          this.heartBeatManager.sendEvaluatorStatus(this.getEvaluatorStatus());
           this.clock.close();
         }
       }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
index d6efa30..94f04cf 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
@@ -22,6 +22,7 @@ import 
org.apache.reef.tests.applications.ApplicationTestSuite;
 import org.apache.reef.tests.close_eval.CloseEvaluatorTest;
 import org.apache.reef.tests.configurationproviders.ConfigurationProviderTest;
 import org.apache.reef.tests.driver.DriverTest;
+import org.apache.reef.tests.evaluatorclose.EvaluatorCloseTest;
 import org.apache.reef.tests.runtimename.RuntimeNameTest;
 import org.apache.reef.tests.evaluatorfailure.EvaluatorFailureTest;
 import org.apache.reef.tests.evaluatorreuse.EvaluatorReuseTest;
@@ -60,6 +61,7 @@ import org.junit.runners.Suite;
     ApplicationTestSuite.class,
     RuntimeNameTest.class,
     WatcherTest.class,
+    EvaluatorCloseTest.class,
     })
 public final class AllTestsSuite {
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
new file mode 100644
index 0000000..e800cf0
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+final class EvaluatorCloseDriver {
+  private static final Logger LOG = 
Logger.getLogger(EvaluatorCloseDriver.class.getName());
+  private final AtomicBoolean changedToClosing = new AtomicBoolean(false);
+  private TaskRepresenter taskRepresenter;
+  private AllocatedEvaluator evaluator;
+
+  @Inject
+  EvaluatorCloseDriver() {
+  }
+
+  final class EvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      LOG.log(Level.FINE, "Received a AllocatedEvaluator for Evaluator {0}", 
allocatedEvaluator.getId());
+      evaluator = allocatedEvaluator;
+      final Configuration taskConfiguration = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, "EvaluatorCloseTestTask")
+          .set(TaskConfiguration.TASK, EvaluatorCloseTestTask.class)
+          .build();
+      allocatedEvaluator.submitTask(taskConfiguration);
+    }
+  }
+
+  final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+    @Override
+    public void onNext(final RunningTask runningTask) {
+      LOG.log(Level.FINE, "Received a RunningTask on Evaluator {0}", 
runningTask.getActiveContext().getEvaluatorId());
+      taskRepresenter = runningTask.getTaskRepresenter();
+      evaluator.close();
+      changedToClosing.set(taskRepresenter.evaluatorIsClosing());
+    }
+  }
+
+  final class StopHandler implements EventHandler<StopTime> {
+
+    @Override
+    public void onNext(final StopTime stopTime) {
+      if (!changedToClosing.get()) {
+        throw new DriverSideFailure("Evaluator's state was not changed to 
closing.");
+      } else if (!taskRepresenter.evaluatorIsClosed()){
+        throw new DriverSideFailure("Evaluator's state was not changed to 
closed after completion.");
+      } else {
+        LOG.log(Level.FINEST, "Evaluator state was changed properly (RUNNING 
-> CLOSING -> CLOSED).");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
new file mode 100644
index 0000000..d6a4214
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.tests.library.driver.OnDriverStartedAllocateOne;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests whether evaluator's state is properly changed during closing.
+ */
+public class EvaluatorCloseTest {
+
+  private final TestEnvironment testEnvironment = 
TestEnvironmentFactory.getNewTestEnvironment();
+
+  @Before
+  public void setUp() throws Exception {
+    testEnvironment.setUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  /**
+   * Tests that an evaluator's state is changed to closing and closed in order 
during closing.
+   */
+  @Test
+  public void testEvaluatorClosingState() throws InjectionException {
+    final Configuration runtimeConfiguration = 
this.testEnvironment.getRuntimeConfiguration();
+
+    final Configuration driverConfiguration = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES,
+            EnvironmentUtils.getClassLocation(EvaluatorCloseDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_EvaluatorCloseTest")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, 
OnDriverStartedAllocateOne.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
EvaluatorCloseDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_TASK_RUNNING, 
EvaluatorCloseDriver.TaskRunningHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STOP, 
EvaluatorCloseDriver.StopHandler.class)
+        .build();
+
+    final LauncherStatus status = 
DriverLauncher.getLauncher(runtimeConfiguration)
+        .run(driverConfiguration, this.testEnvironment.getTestTimeout());
+
+    Assert.assertTrue("The result state of evaluator closing state test is " + 
status,
+        status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
new file mode 100644
index 0000000..ad234e4
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * The task for testing the evaluator's state changing during close.
+ */
+final class EvaluatorCloseTestTask implements Task {
+
+  @Inject
+  EvaluatorCloseTestTask() {
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    while(true) {
+      // Busy loop
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
new file mode 100644
index 0000000..948ac7e
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests of closing evaluator.
+ */
+package org.apache.reef.tests.evaluatorclose;

http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
index e76e2c4..b07933a 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
@@ -91,7 +91,6 @@ final class EvaluatorFailureDuringAlarmDriver {
     public void onNext(final FailedTask failedTask) {
       LOG.log(Level.SEVERE, "Received FailedTask: {0}", failedTask);
       otherFailuresReceived.set(true);
-
     }
   }
 

Reply via email to