Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/750#discussion_r49463254
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -427,6 +427,28 @@ class JobManager(
               )
           }
     
    +    case StopJob(jobID) =>
    +      log.info(s"Trying to stop job with ID $jobID.")
    +
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, _)) =>
    +          try {
    +            if (executionGraph.getState() != JobStatus.RUNNING) {
    --- End diff --
    
    This guard is not sufficient since job state changes can happen 
asynchronously. Thus, it might happen that the current state is 
`JobStatus.RUNNING` when checking this condition, but before you execute 
`executionGraph.stop()` it will change to `JobStatus.RESTARTING`, for example. 
As a consequence, the sender will receive a `StoppingSuccess` message even 
though the job won't be stopped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to