Repository: reef Updated Branches: refs/heads/master 1356dd118 -> 7495f78a2
[REEF-1040] Fix a bug in WatcherTest The issue is addressed by following changes: * Change TaskRepresenter to make RunningTask event handlers be called when the first RUNNING message arrived from Evaluator * Immediately send a heartbeat message to the driver when task state is change to RUNNING in the evaluator * Add WatcherTest to AllTestSuite JIRA: [REEF-1040](https://issues.apache.org/jira/browse/REEF-1040) Pull Request: Closes #845 Author: Geon-Woo Kim ([email protected]) Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7495f78a Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7495f78a Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7495f78a Branch: refs/heads/master Commit: 7495f78a2a4311574f2c237a8e2927856b78827b Parents: 1356dd1 Author: Geon-Woo Kim <[email protected]> Authored: Thu Feb 18 04:15:48 2016 +0900 Committer: Andrew Chung <[email protected]> Committed: Sat Feb 20 09:23:38 2016 -0800 ---------------------------------------------------------------------- .../reef/runtime/common/driver/task/TaskRepresenter.java | 11 ++++++++--- .../reef/runtime/common/evaluator/task/TaskStatus.java | 1 + .../test/java/org/apache/reef/tests/AllTestsSuite.java | 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/7495f78a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index fc3d6e5..54fce7e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -55,6 +55,7 @@ public final class TaskRepresenter { // Mutable state private State state = State.INIT; + private boolean isFirstRunningMessage = true; public TaskRepresenter(final String taskId, final EvaluatorContext context, @@ -125,9 +126,6 @@ public final class TaskRepresenter { LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" + " which we have seen before. Ignoring the second message", this.taskId); } else { - final RunningTask runningTask = new RunningTaskImpl( - this.evaluatorManager, this.taskId, this.context, this); - this.messageDispatcher.onTaskRunning(runningTask); this.setState(State.RUNNING); } } @@ -140,6 +138,13 @@ public final class TaskRepresenter { " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state); } + if (isFirstRunningMessage) { + isFirstRunningMessage = false; + final RunningTask runningTask = new RunningTaskImpl( + this.evaluatorManager, this.taskId, this.context, this); + this.messageDispatcher.onTaskRunning(runningTask); + } + // fire driver restart task running handler if this is a recovery heartbeat if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { final RunningTask runningTask = new RunningTaskImpl( http://git-wip-us.apache.org/repos/asf/reef/blob/7495f78a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java index cb6613e..3596baa 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java @@ -230,6 +230,7 @@ public final class TaskStatus { */ void setRunning() { this.setState(State.RUNNING); + this.heartbeat(); } void setCloseRequested() { http://git-wip-us.apache.org/repos/asf/reef/blob/7495f78a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java index 11d3d3f..d6efa30 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/AllTestsSuite.java @@ -34,6 +34,7 @@ import org.apache.reef.tests.messaging.task.TaskMessagingTest; import org.apache.reef.tests.statepassing.StatePassingTest; import org.apache.reef.tests.subcontexts.SubContextTest; import org.apache.reef.tests.taskresubmit.TaskResubmitTest; +import org.apache.reef.tests.watcher.WatcherTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -57,7 +58,8 @@ import org.junit.runners.Suite; ExamplesTestSuite.class, ConfigurationProviderTest.class, ApplicationTestSuite.class, - RuntimeNameTest.class + RuntimeNameTest.class, + WatcherTest.class, }) public final class AllTestsSuite { }
