Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/750#discussion_r37999652
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -411,6 +411,23 @@ class TaskManager(
                 log.debug(s"Cannot find task to fail for execution 
${executionID})")
               }
     
    +        // stops a task
    +        case StopTask(executionID) =>
    +          val task = runningTasks.get(executionID)
    +          if (task != null) {
    +            try {
    +              task.stopExecution()
    --- End diff --
    
    Hmm, if I followed the control flow correctly, then the `stop` operation of 
the `SourceStreamTask` calls the `cancel` method of the `StreamSource`. The 
`StreamSource` calls `cancel` on the userFunction of type `SourceFunction`. 
Thus, all `SourceFunction` implementations have to be non-blocking when calling 
`cancel`. I haven't checked all of them but I've found the `KafkaSource` which 
also calls `shutdown` on a consumer which is of type 
`ZooKeeperConsumerConnector`. What I want to show you with my little trip 
through the class dependencies is that it's easy to get things wrong at one or 
the other function call. Moreover, the `Stoppable` interface never says that 
`stop` should be a non-blocking operation. Thus, I'm wondering how you know and 
can guarantee that it's non-blocking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to