[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal state
FINISHED to transition to FAILED. Such a transition will be propagated to the
ExecutionGraph where it entails JobStatus changes. Since the Execution already
reached a terminal state, it should not again affect the ExecutionGraph. This
can lead to an inconsistent state in case of a restart where the old Executions
get disassociated from the ExecutionGraph.

This closes #1613


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6968a57a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6968a57a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6968a57a

Branch: refs/heads/master
Commit: 6968a57a1a31a11b33bacd2c94d6559bcabd6eb9
Parents: 48b7454
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Feb 9 10:30:12 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 15:34:37 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  14 +-
 .../ExecutionGraphRestartTest.java              |  90 +++++++++++++
 .../runtime/testingUtils/TestingCluster.scala   |   6 +-
 .../testingUtils/TestingTaskManagerLike.scala   |   4 +-
 .../runtime/testingUtils/TestingUtils.scala     | 133 ++++++++++++++++++-
 5 files changed, 233 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index eb2e68c..db037bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -435,7 +435,7 @@ public class Execution implements Serializable {
                                return;
                        }
                        else if (current == CREATED || current == SCHEDULED) {
-                               // from here, we can directly switch to 
cancelled, because the no task has been deployed
+                               // from here, we can directly switch to 
cancelled, because no task has been deployed
                                if (transitionState(current, CANCELED)) {
                                        
                                        // we skip the canceling state. set the 
timestamp, for a consistent appearance
@@ -754,11 +754,10 @@ public class Execution implements Serializable {
                                return false;
                        }
 
-                       if (current == CANCELED) {
-                               // we are already aborting or are already 
aborted
+                       if (current == CANCELED || current == FINISHED) {
+                               // we are already aborting or are already 
aborted or we are already finished
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug(String.format("Ignoring 
transition of vertex %s to %s while being %s", 
-                                                       getVertexWithAttempt(), 
FAILED, CANCELED));
+                                       LOG.debug("Ignoring transition of 
vertex {} to {} while being {}.", getVertexWithAttempt(), FAILED, current);
                                }
                                return false;
                        }
@@ -928,6 +927,11 @@ public class Execution implements Serializable {
        }
 
        private boolean transitionState(ExecutionState currentState, 
ExecutionState targetState, Throwable error) {
+               // sanity check
+               if (currentState.isTerminal()) {
+                       throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + ".");
+               }
+
                if (STATE_UPDATER.compareAndSet(this, currentState, 
targetState)) {
                        markTimestamp(targetState);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0c3af8f..47a48a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -387,6 +388,95 @@ public class ExecutionGraphRestartTest {
                assertEquals(1, eg.getNumberOfRetriesLeft());
        }
 
+       /**
+        * Tests that a failing execution does not affect a restarted job. This 
is important if a
+        * callback handler fails an execution after it has already reached a 
final state and the job
+        * has been restarted.
+        */
+       @Test
+       public void testFailingExecutionAfterRestart() throws Exception {
+               Instance instance = ExecutionGraphTestUtils.getInstance(
+                       new 
SimpleActorGateway(TestingUtils.directExecutionContext()),
+                       2);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               scheduler.newInstanceAvailable(instance);
+
+               JobVertex sender = new JobVertex("Task1");
+               sender.setInvokableClass(Tasks.NoOpInvokable.class);
+               sender.setParallelism(1);
+
+               JobVertex receiver = new JobVertex("Task2");
+               receiver.setInvokableClass(Tasks.NoOpInvokable.class);
+               receiver.setParallelism(1);
+
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
+
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutionContext(),
+                       new JobID(),
+                       "test job",
+                       new Configuration(),
+                       AkkaUtils.getDefaultTimeout());
+               eg.setNumberOfRetriesLeft(1);
+               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+               assertEquals(JobStatus.CREATED, eg.getState());
+
+               eg.scheduleForExecution(scheduler);
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
+
+               Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+               Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+
+               finishedExecution.markFinished();
+               failedExecution.fail(new Exception("Test Exception"));
+
+               failedExecution.cancelingComplete();
+
+               FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+
+               Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft() && eg.getState() != 
JobStatus.RUNNING) {
+                       Thread.sleep(100);
+               }
+
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               // Wait for deploying after async restart
+               deadline = timeout.fromNow();
+               boolean success = false;
+
+               while (deadline.hasTimeLeft() && !success) {
+                       success = true;
+
+                       for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                               if 
(vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
+                                       success = false;
+                                       Thread.sleep(100);
+                                       break;
+                               } else {
+                                       
vertex.getCurrentExecutionAttempt().switchToRunning();
+                               }
+                       }
+               }
+
+               // fail old finished execution, this should not affect the 
execution
+               finishedExecution.fail(new Exception("This should have no 
effect"));
+
+               for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
+                       vertex.getCurrentExecutionAttempt().markFinished();
+               }
+
+               // the state of the finished execution should have not changed 
since it is terminal
+               assertEquals(ExecutionState.FINISHED, 
finishedExecution.getState());
+
+               assertEquals(JobStatus.FINISHED, eg.getState());
+       }
+
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
 
                eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));

http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index bd56040..22b0d29 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -42,9 +42,9 @@ import scala.concurrent.{Await, Future}
  *                          otherwise false
  */
 class TestingCluster(
-                      userConfiguration: Configuration,
-                      singleActorSystem: Boolean,
-                      synchronousDispatcher: Boolean)
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    synchronousDispatcher: Boolean)
   extends FlinkMiniCluster(
     userConfiguration,
     singleActorSystem) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
index c10e83e..e9dbdde 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.{Terminated, ActorRef}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
@@ -32,8 +31,7 @@ AcknowledgeRegistration}
 import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, 
UpdateTaskExecutionState, TaskInFinalState}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
-CheckIfJobRemoved, Alive}
+import 
org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, 
CheckIfJobRemoved, Alive}
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 
 import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 98faa34..679dc71 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -26,8 +26,11 @@ import com.google.common.util.concurrent.MoreExecutors
 
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
+import org.apache.flink.api.common.JobExecutionResult
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.client.JobClient
+import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -168,6 +171,26 @@ object TestingUtils {
   }
 
   def createTaskManager(
+    actorSystem: ActorSystem,
+    jobManager: ActorGateway,
+    configuration: Configuration,
+    useLocalCommunication: Boolean,
+    waitForRegistration: Boolean,
+    taskManagerClass: Class[_ <: TaskManager])
+  : ActorGateway = {
+    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
+
+    createTaskManager(
+      actorSystem,
+      jobManagerURL,
+      configuration,
+      useLocalCommunication,
+      waitForRegistration,
+      taskManagerClass
+    )
+  }
+
+  def createTaskManager(
       actorSystem: ActorSystem,
       jobManager: ActorGateway,
       configuration: Configuration,
@@ -200,11 +223,30 @@ object TestingUtils {
     * @return ActorGateway of the created TaskManager
     */
   def createTaskManager(
+    actorSystem: ActorSystem,
+    jobManagerURL: String,
+    configuration: Configuration,
+    useLocalCommunication: Boolean,
+    waitForRegistration: Boolean)
+  : ActorGateway = {
+    createTaskManager(
+      actorSystem,
+      jobManagerURL,
+      configuration,
+      useLocalCommunication,
+      waitForRegistration,
+      classOf[TestingTaskManager]
+    )
+  }
+
+
+  def createTaskManager(
       actorSystem: ActorSystem,
       jobManagerURL: String,
       configuration: Configuration,
       useLocalCommunication: Boolean,
-      waitForRegistration: Boolean)
+      waitForRegistration: Boolean,
+      taskManagerClass: Class[_ <: TaskManager])
     : ActorGateway = {
 
     val resultingConfiguration = new Configuration()
@@ -222,7 +264,7 @@ object TestingUtils {
       None,
       leaderRetrievalService,
       useLocalCommunication,
-      classOf[TestingTaskManager]
+      taskManagerClass
     )
 
     if (waitForRegistration) {
@@ -264,6 +306,72 @@ object TestingUtils {
       actorSystem: ActorSystem,
       configuration: Configuration)
     : ActorGateway = {
+    createJobManager(
+      actorSystem,
+      configuration,
+      classOf[TestingJobManager]
+    )
+  }
+
+  def createJobManager(
+      actorSystem: ActorSystem,
+      configuration: Configuration,
+      executionContext: ExecutionContext)
+    : ActorGateway = {
+
+    val (_,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    executionRetries,
+    delayBetweenRetries,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+      configuration,
+      None
+    )
+
+    val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount)
+
+    val archive: ActorRef = actorSystem.actorOf(archiveProps, 
JobManager.ARCHIVE_NAME)
+
+    val jobManagerProps = Props(
+      classOf[TestingJobManager],
+      configuration,
+      executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphs,
+      checkpointRecoveryFactory)
+
+    val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, 
JobManager.JOB_MANAGER_NAME)
+
+    new AkkaActorGateway(jobManager, null)
+  }
+
+
+  /**
+    * Creates a JobManager of the given class using the default recovery mode 
(standalone)
+    *
+    * @param actorSystem ActorSystem to use
+    * @param configuration Configuration to use
+    * @param jobManagerClass JobManager class to instantiate
+    * @return
+    */
+  def createJobManager(
+      actorSystem: ActorSystem,
+      configuration: Configuration,
+      jobManagerClass: Class[_ <: JobManager])
+    : ActorGateway = {
 
     configuration.setString(ConfigConstants.RECOVERY_MODE, 
ConfigConstants.DEFAULT_RECOVERY_MODE)
 
@@ -272,7 +380,7 @@ object TestingUtils {
         actorSystem,
         Some(JobManager.JOB_MANAGER_NAME),
         Some(JobManager.ARCHIVE_NAME),
-        classOf[TestingJobManager],
+        jobManagerClass,
         classOf[MemoryArchivist])
 
     new AkkaActorGateway(actor, null)
@@ -312,6 +420,25 @@ object TestingUtils {
     new AkkaActorGateway(actor, null)
   }
 
+  def submitJobAndWait(
+      actorSystem: ActorSystem,
+      jobManager: ActorGateway,
+      jobGraph: JobGraph)
+    : JobExecutionResult = {
+
+    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
+    val leaderRetrievalService = new 
StandaloneLeaderRetrievalService(jobManagerURL)
+
+    JobClient.submitJobAndWait(
+      actorSystem,
+      leaderRetrievalService,
+      jobGraph,
+      TESTING_DURATION,
+      false,
+      Thread.currentThread().getContextClassLoader
+    )
+  }
+
   class ForwardingActor(val target: ActorRef, val leaderSessionID: 
Option[UUID])
     extends FlinkActor with LeaderSessionMessageFilter with LogMessages {
 

Reply via email to