Repository: incubator-reef Updated Branches: refs/heads/master 76e4a4813 -> 09056ec51
[REEF-865] RunningTaskImpl should close tasks in both INIT and RUNNING status This PR resolves the frequent failures of FailTask testsuite on Docker environment. * Add `isClosable` function to TaskRepresenter * Change close() and close(message) to close a task with INIT or RUNNING status * Add a document for FailTask testsuite. JIRA: [REEF-865](https://issues.apache.org/jira/browse/REEF-865) Pull Request: This closes #584 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/09056ec5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/09056ec5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/09056ec5 Branch: refs/heads/master Commit: 09056ec51969c08704555066aece246387cc80c5 Parents: 76e4a48 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Thu Oct 22 20:58:36 2015 +0900 Committer: Markus Weimer <git...@weimo.de> Committed: Sun Oct 25 19:36:02 2015 +0100 ---------------------------------------------------------------------- .../common/driver/task/RunningTaskImpl.java | 20 ++++++++++---------- .../common/driver/task/TaskRepresenter.java | 7 +++++++ .../reef/tests/fail/task/package-info.java | 2 +- 3 files changed, 18 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java index 89a46df..76193f2 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java @@ -86,13 +86,13 @@ public final class RunningTaskImpl implements RunningTask { public void close() { LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]"); - if (this.taskRepresenter.isNotRunning()) { - LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING."); - } else { + if (this.taskRepresenter.isClosable()) { final ContextControlProto contextControlProto = ContextControlProto.newBuilder() .setStopTask(StopTaskProto.newBuilder().build()) .build(); this.evaluatorManager.sendContextControlMessage(contextControlProto); + } else { + LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING."); } } @@ -100,15 +100,15 @@ public final class RunningTaskImpl implements RunningTask { public void close(final byte[] message) { LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "] with message."); - if (this.taskRepresenter.isNotRunning()) { + if (this.taskRepresenter.isClosable()) { + final ContextControlProto contextControlProto = ContextControlProto.newBuilder() + .setStopTask(StopTaskProto.newBuilder().build()) + .setTaskMessage(ByteString.copyFrom(message)) + .build(); + this.evaluatorManager.sendContextControlMessage(contextControlProto); + } else { throw new RuntimeException("Trying to send a message to a Task that is no longer RUNNING."); } - - final ContextControlProto contextControlProto = ContextControlProto.newBuilder() - .setStopTask(StopTaskProto.newBuilder().build()) - .setTaskMessage(ByteString.copyFrom(message)) - .build(); - this.evaluatorManager.sendContextControlMessage(contextControlProto); } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index 0107878..3f70817 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -202,6 +202,13 @@ public final class TaskRepresenter { return this.state != ReefServiceProtos.State.RUNNING; } + /** + * @return true, if this task is in INIT or RUNNING status. + */ + public boolean isClosable() { + return this.state == ReefServiceProtos.State.INIT || this.state == ReefServiceProtos.State.RUNNING; + } + private void setState(final ReefServiceProtos.State newState) { LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]", new Object[]{this.taskId, this.state, newState}); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java index 8d3c9fa..974749e 100644 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java @@ -17,6 +17,6 @@ * under the License. */ /** - * TODO: Document. + * Tests for Tasks fail in various cases. */ package org.apache.reef.tests.fail.task;