Repository: reef
Updated Branches:
refs/heads/master d25412e9c -> 8c0856114
[REEF-832] Add CLOSING state to EvaluatorState
This addressed the issue by
* When EvaluatorManager close a running evaluator, it turn into CLOSING state
instead of KILLED.
* Adding ACK logic from EvaluatorRuntime to EvaluatorManager during killing.
* After EvaluatorManager ACK this message, it turn into KILLED state.
JIRA:
[REEF-832](https://issues.apache.org/jira/browse/REEF-832)
Pull Request:
This closes #1077
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8c085611
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8c085611
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8c085611
Branch: refs/heads/master
Commit: 8c08561146ed5c1c22927ff7cc278dac966e85c9
Parents: d25412e
Author: sanha <[email protected]>
Authored: Thu Aug 18 12:30:24 2016 +0900
Committer: Markus Weimer <[email protected]>
Committed: Thu Sep 8 11:39:41 2016 -0700
----------------------------------------------------------------------
.../apache/reef/driver/task/RunningTask.java | 7 ++
.../driver/evaluator/EvaluatorManager.java | 112 ++++++++++++-------
.../common/driver/evaluator/EvaluatorState.java | 2 +-
.../evaluator/EvaluatorStatusManager.java | 29 ++++-
.../common/driver/evaluator/Evaluators.java | 2 +-
.../common/driver/task/RunningTaskImpl.java | 4 +
.../common/driver/task/TaskRepresenter.java | 16 +++
.../common/evaluator/EvaluatorRuntime.java | 1 +
.../org/apache/reef/tests/AllTestsSuite.java | 2 +
.../evaluatorclose/EvaluatorCloseDriver.java | 85 ++++++++++++++
.../evaluatorclose/EvaluatorCloseTest.java | 75 +++++++++++++
.../evaluatorclose/EvaluatorCloseTestTask.java | 40 +++++++
.../reef/tests/evaluatorclose/package-info.java | 22 ++++
.../EvaluatorFailureDuringAlarmDriver.java | 1 -
14 files changed, 353 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
index d406d80..1453cf7 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/RunningTask.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
/**
* Represents a running Task.
@@ -70,4 +71,10 @@ public interface RunningTask extends Identifiable,
AutoCloseable {
*/
@Override
void close();
+
+ /**
+ * Gets the representer of task.
+ * @return the representer of task
+ */
+ TaskRepresenter getTaskRepresenter();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index c461a5b..d4b8997 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -232,13 +232,16 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
.setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
.build();
sendEvaluatorControlMessage(evaluatorControlProto);
+ this.stateManager.setClosing();
+ } else {
+ this.stateManager.setKilled();
}
- } finally {
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Exception occurred when manager sends
killing message to task.", e);
this.stateManager.setKilled();
}
}
-
if (!this.isResourceReleased) {
this.isResourceReleased = true;
try {
@@ -248,20 +251,20 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
@Override
public void onNext(final Alarm alarm) {
EvaluatorManager.this.resourceReleaseHandler.onNext(
- ResourceReleaseEventImpl.newBuilder()
- .setIdentifier(EvaluatorManager.this.evaluatorId)
-
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
- .build()
+ ResourceReleaseEventImpl.newBuilder()
+ .setIdentifier(EvaluatorManager.this.evaluatorId)
+
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
+ .build()
);
}
});
} catch (final IllegalStateException e) {
LOG.log(Level.WARNING, "Force resource release because the client
closed the clock.", e);
EvaluatorManager.this.resourceReleaseHandler.onNext(
- ResourceReleaseEventImpl.newBuilder()
- .setIdentifier(EvaluatorManager.this.evaluatorId)
-
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
- .build()
+ ResourceReleaseEventImpl.newBuilder()
+ .setIdentifier(EvaluatorManager.this.evaluatorId)
+
.setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
+ .build()
);
}
}
@@ -276,7 +279,21 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
*/
public boolean isClosed() {
return this.messageDispatcher.isEmpty() &&
- this.stateManager.isDoneOrFailedOrKilled();
+ this.stateManager.isDoneOrFailedOrKilled();
+ }
+
+ /**
+ * Return true if the state is CLOSING.
+ */
+ public boolean isClosing() {
+ return this.stateManager.isClosing();
+ }
+
+ /**
+ * Return true if the state is DONE, FAILED, KILLED, or CLOSING.
+ */
+ public boolean isClosedOrClosing() {
+ return isClosed() || isClosing();
}
/**
@@ -352,35 +369,35 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0}
which is already in state {1}.",
new Object[]{this.getId(), this.stateManager});
return;
- }
-
- this.sanityChecker.check(evaluatorId,
evaluatorHeartbeatProto.getTimestamp());
- final String evaluatorRID =
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
-
- final EvaluatorRestartState evaluatorRestartState =
driverRestartManager.getEvaluatorRestartState(evaluatorId);
-
- /*
- * First message from a running evaluator. The evaluator can be a new
evaluator or be a previous evaluator
- * from a separate application attempt. In the case of a previous
evaluator, if the restart period has not
- * yet expired, we should register it and trigger context active and
task events. If the restart period has
- * expired, we should return immediately after setting its remote ID in
order to close it.
- */
- if (this.stateManager.isSubmitted() ||
- evaluatorRestartState == EvaluatorRestartState.REPORTED ||
- evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
-
- this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-
- if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
- // Don't do anything if evaluator has expired. Close it immediately
upon exit of this method.
- return;
- }
+ } else if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+ this.sanityChecker.check(evaluatorId,
evaluatorHeartbeatProto.getTimestamp());
+ final String evaluatorRID =
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
+
+ final EvaluatorRestartState evaluatorRestartState =
driverRestartManager.getEvaluatorRestartState(evaluatorId);
+
+ /*
+ * First message from a running evaluator. The evaluator can be a new
evaluator or be a previous evaluator
+ * from a separate application attempt. In the case of a previous
evaluator, if the restart period has not
+ * yet expired, we should register it and trigger context active and
task events. If the restart period has
+ * expired, we should return immediately after setting its remote ID
in order to close it.
+ */
+ if (this.stateManager.isSubmitted() ||
+ evaluatorRestartState == EvaluatorRestartState.REPORTED ||
+ evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
+
+ this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+
+ if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
+ // Don't do anything if evaluator has expired. Close it
immediately upon exit of this method.
+ return;
+ }
- this.stateManager.setRunning();
- LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+ this.stateManager.setRunning();
+ LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
- if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
- driverRestartManager.setEvaluatorReregistered(evaluatorId);
+ if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
+ driverRestartManager.setEvaluatorReregistered(evaluatorId);
+ }
}
}
@@ -426,8 +443,10 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
case FAILED:
this.onEvaluatorFailed(message);
break;
- case INIT:
case KILLED:
+ this.onEvaluatorKilled(message);
+ break;
+ case INIT:
case RUNNING:
case SUSPEND:
break;
@@ -482,6 +501,19 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
onEvaluatorException(evaluatorException);
}
+ /**
+ * Process an evaluator message that indicates that the evaluator completed
the unclean shut down request.
+ *
+ * @param message
+ */
+ private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO
message) {
+ assert message.getState() == State.KILLED;
+ assert stateManager.isClosing();
+ LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
+
+ this.stateManager.setKilled();
+ }
+
public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isAllocated()) {
@@ -570,7 +602,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
public void onResourceStatusMessage(final ResourceStatusEvent
resourceStatusEvent) {
synchronized (this.evaluatorDescriptor) {
LOG.log(Level.FINEST, "Resource manager state update: {0}",
resourceStatusEvent.getState());
- if (this.stateManager.isDoneOrFailedOrKilled()) {
+ if (!this.stateManager.isAllocatedOrSubmittedOrRunning()) {
LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0}
which is already in state {1}.",
new Object[]{this.getId(), this.stateManager});
} else if (isDoneOrFailedOrKilled(resourceStatusEvent) &&
this.stateManager.isAllocatedOrSubmittedOrRunning()) {
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
index 6d9a26c..653486e 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -31,7 +31,7 @@ enum EvaluatorState {
ALLOCATED, // initial state
SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're
waiting for first contact
RUNNING, // first contact received, all communication channels
established, Evaluator sent to client.
- // TODO[JIRA REEF-832]: Add CLOSING state
+ CLOSING, // evaluator is asked shutdown, but not closed yet.
DONE, // clean shutdown
FAILED, // some failure occurred.
KILLED // unclean shutdown
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
index 5d5d1f3..a2b249a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -52,9 +52,10 @@ final class EvaluatorStatusManager {
switch(to) {
case SUBMITTED:
case DONE:
+ case CLOSING:
case FAILED:
- case KILLED:
return true;
+ case KILLED:
case RUNNING:
break;
default:
@@ -65,10 +66,11 @@ final class EvaluatorStatusManager {
switch(to) {
case RUNNING:
case DONE:
+ case CLOSING:
case FAILED:
- case KILLED:
return true;
case ALLOCATED:
+ case KILLED:
break;
default:
throw new RuntimeException("Unknown state: " + to);
@@ -77,11 +79,26 @@ final class EvaluatorStatusManager {
case RUNNING: {
switch(to) {
case DONE:
+ case CLOSING:
case FAILED:
+ return true;
+ case ALLOCATED:
+ case SUBMITTED:
case KILLED:
+ break;
+ default:
+ throw new RuntimeException("Unknown state: " + to);
+ }
+ }
+ case CLOSING: {
+ switch(to) {
+ case KILLED:
+ case DONE:
+ case FAILED:
return true;
case ALLOCATED:
case SUBMITTED:
+ case RUNNING:
break;
default:
throw new RuntimeException("Unknown state: " + to);
@@ -113,6 +130,10 @@ final class EvaluatorStatusManager {
this.setState(EvaluatorState.SUBMITTED);
}
+ synchronized void setClosing() {
+ this.setState(EvaluatorState.CLOSING);
+ }
+
synchronized void setDone() {
this.setState(EvaluatorState.DONE);
}
@@ -151,6 +172,10 @@ final class EvaluatorStatusManager {
return EvaluatorState.FAILED == this.state || EvaluatorState.KILLED ==
this.state;
}
+ synchronized boolean isClosing() {
+ return EvaluatorState.CLOSING == this.state;
+ }
+
@Override
public synchronized String toString() {
return this.state.toString();
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 5bc0645..fac1ab4 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -67,7 +67,7 @@ public final class Evaluators implements AutoCloseable {
}
for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}",
evaluatorManager.getId());
- if (!evaluatorManager.isClosed()) {
+ if (!evaluatorManager.isClosedOrClosing()) {
evaluatorManager.close();
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
index e00579d..dfa20cb 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
@@ -138,4 +138,8 @@ public final class RunningTaskImpl implements RunningTask {
public String toString() {
return "RunningTask{taskId='" + taskId + "'}";
}
+
+ public TaskRepresenter getTaskRepresenter() {
+ return taskRepresenter;
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 54fce7e..1c51870 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -220,4 +220,20 @@ public final class TaskRepresenter {
new Object[]{this.taskId, this.state, newState});
this.state = newState;
}
+
+ /**
+ * Check whether this evaluator is in closing state.
+ * @return whether this evaluator is in closing state.
+ */
+ public boolean evaluatorIsClosing() {
+ return evaluatorManager.isClosing();
+ }
+
+ /**
+ * Check whether this evaluator is in closed state.
+ * @return whether this evaluator is in closed state.
+ */
+ public boolean evaluatorIsClosed() {
+ return evaluatorManager.isClosed();
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
index 1384556..74c4f38 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
@@ -130,6 +130,7 @@ final class EvaluatorRuntime implements
EventHandler<EvaluatorControlProto> {
if (message.hasKillEvaluator()) {
LOG.log(Level.SEVERE, "Evaluator {0} has been killed by the
driver.", this.evaluatorIdentifier);
this.state = ReefServiceProtos.State.KILLED;
+ this.heartBeatManager.sendEvaluatorStatus(this.getEvaluatorStatus());
this.clock.close();
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
index d6efa30..94f04cf 100644
---
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java
@@ -22,6 +22,7 @@ import
org.apache.reef.tests.applications.ApplicationTestSuite;
import org.apache.reef.tests.close_eval.CloseEvaluatorTest;
import org.apache.reef.tests.configurationproviders.ConfigurationProviderTest;
import org.apache.reef.tests.driver.DriverTest;
+import org.apache.reef.tests.evaluatorclose.EvaluatorCloseTest;
import org.apache.reef.tests.runtimename.RuntimeNameTest;
import org.apache.reef.tests.evaluatorfailure.EvaluatorFailureTest;
import org.apache.reef.tests.evaluatorreuse.EvaluatorReuseTest;
@@ -60,6 +61,7 @@ import org.junit.runners.Suite;
ApplicationTestSuite.class,
RuntimeNameTest.class,
WatcherTest.class,
+ EvaluatorCloseTest.class,
})
public final class AllTestsSuite {
}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
new file mode 100644
index 0000000..e800cf0
--- /dev/null
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseDriver.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+final class EvaluatorCloseDriver {
+ private static final Logger LOG =
Logger.getLogger(EvaluatorCloseDriver.class.getName());
+ private final AtomicBoolean changedToClosing = new AtomicBoolean(false);
+ private TaskRepresenter taskRepresenter;
+ private AllocatedEvaluator evaluator;
+
+ @Inject
+ EvaluatorCloseDriver() {
+ }
+
+ final class EvaluatorAllocatedHandler implements
EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ LOG.log(Level.FINE, "Received a AllocatedEvaluator for Evaluator {0}",
allocatedEvaluator.getId());
+ evaluator = allocatedEvaluator;
+ final Configuration taskConfiguration = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, "EvaluatorCloseTestTask")
+ .set(TaskConfiguration.TASK, EvaluatorCloseTestTask.class)
+ .build();
+ allocatedEvaluator.submitTask(taskConfiguration);
+ }
+ }
+
+ final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ LOG.log(Level.FINE, "Received a RunningTask on Evaluator {0}",
runningTask.getActiveContext().getEvaluatorId());
+ taskRepresenter = runningTask.getTaskRepresenter();
+ evaluator.close();
+ changedToClosing.set(taskRepresenter.evaluatorIsClosing());
+ }
+ }
+
+ final class StopHandler implements EventHandler<StopTime> {
+
+ @Override
+ public void onNext(final StopTime stopTime) {
+ if (!changedToClosing.get()) {
+ throw new DriverSideFailure("Evaluator's state was not changed to
closing.");
+ } else if (!taskRepresenter.evaluatorIsClosed()){
+ throw new DriverSideFailure("Evaluator's state was not changed to
closed after completion.");
+ } else {
+ LOG.log(Level.FINEST, "Evaluator state was changed properly (RUNNING
-> CLOSING -> CLOSED).");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
new file mode 100644
index 0000000..d6a4214
--- /dev/null
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.tests.library.driver.OnDriverStartedAllocateOne;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests whether evaluator's state is properly changed during closing.
+ */
+public class EvaluatorCloseTest {
+
+ private final TestEnvironment testEnvironment =
TestEnvironmentFactory.getNewTestEnvironment();
+
+ @Before
+ public void setUp() throws Exception {
+ testEnvironment.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ /**
+ * Tests that an evaluator's state is changed to closing and closed in order
during closing.
+ */
+ @Test
+ public void testEvaluatorClosingState() throws InjectionException {
+ final Configuration runtimeConfiguration =
this.testEnvironment.getRuntimeConfiguration();
+
+ final Configuration driverConfiguration = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES,
+ EnvironmentUtils.getClassLocation(EvaluatorCloseDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_EvaluatorCloseTest")
+ .set(DriverConfiguration.ON_DRIVER_STARTED,
OnDriverStartedAllocateOne.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED,
EvaluatorCloseDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING,
EvaluatorCloseDriver.TaskRunningHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP,
EvaluatorCloseDriver.StopHandler.class)
+ .build();
+
+ final LauncherStatus status =
DriverLauncher.getLauncher(runtimeConfiguration)
+ .run(driverConfiguration, this.testEnvironment.getTestTimeout());
+
+ Assert.assertTrue("The result state of evaluator closing state test is " +
status,
+ status.isSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
new file mode 100644
index 0000000..ad234e4
--- /dev/null
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/EvaluatorCloseTestTask.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluatorclose;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * The task for testing the evaluator's state changing during close.
+ */
+final class EvaluatorCloseTestTask implements Task {
+
+ @Inject
+ EvaluatorCloseTestTask() {
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ while(true) {
+ // Busy loop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
new file mode 100644
index 0000000..948ac7e
--- /dev/null
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorclose/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests of closing evaluator.
+ */
+package org.apache.reef.tests.evaluatorclose;
http://git-wip-us.apache.org/repos/asf/reef/blob/8c085611/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
index e76e2c4..b07933a 100644
---
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
+++
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/evaluatorfailure/EvaluatorFailureDuringAlarmDriver.java
@@ -91,7 +91,6 @@ final class EvaluatorFailureDuringAlarmDriver {
public void onNext(final FailedTask failedTask) {
LOG.log(Level.SEVERE, "Received FailedTask: {0}", failedTask);
otherFailuresReceived.set(true);
-
}
}