[ 
https://issues.apache.org/jira/browse/FLINK-17723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123926#comment-17123926
 ] 

Stephan Ewen commented on FLINK-17723:
--------------------------------------

*TL;DR The threading model is single-threaded, like an actor or mailbox-based 
system. You never need to worry about concurrency in your operators*

+The longer story and historic background:+

The threading model in earlier versions of the DataStream API was "effectively 
single-threaded". There were multiple threads involved (processing, timers, 
checkpointing) but they were using a "global lock" (the checkpoint lock) to be 
always mutually exclusive. So as a users, you could program as if it was 
single-threaded.

Because we saw all out threads were globally mutex-ed, we decided that this 
would be a pretty natural fit for a mailbox model, which was introduced in 
Flink 1.9.

The threading model in some of the connectors, especially the earlier batch 
connectors, was originally a bit more complex. There were frequently multiple 
threads involved, like an extra thread interacting with the client that talks 
to the external system. For HDFS, the connection build-up was in a separate 
thread, and the communication was then in the main task thread.

The reason for that complex threading-model in connectors were bugs in the 
Kafka and HDFS clients. Both had in previous versions pretty rough behavior 
when the threads interacting with them were interrupted at the wrong time. Both 
systems had some incorrect handling of their loops and InterruptedExceptions 
that could lead to live locks if a thread was interrupted at the wrong point. 
The thread would go into a CPU burning hot loop without a chance to stop it. 
Because Flink interrupts the task threads when cancelling tasks (to speed up 
cancellation), we decided to move these clients into a "safe thread" to avoid 
this issue (make the cancellation experience more smooth than "kill the JVM").

I hope that helps with the confusion and explains why some parts of the code 
look like multi-threaded code even though the APIs today are strict "single 
threaded mailbox" style.

> Written design for flink threading model and guarantees made to the various 
> structual components
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17723
>                 URL: https://issues.apache.org/jira/browse/FLINK-17723
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>            Reporter: John Lonergan
>            Priority: Major
>
> I enjoy using Flink but ...
> Do we have a written design for the threading model including the guarantees 
> made by the core framework in terms of threading and concurrency.
> Looking at various existing components such as JDBC and file sinks and other 
> non-core facilities.
> Having some difficulty understanding the intended design.
> Want to understand the assumptions I can make about when certain functions 
> will be called (for example JDBCOutputFormat  open vs flush vs writeRecord vs 
> close) and whether this will always be from the same thread or some other 
> thread, or whether they might be called concurrently, in order to verify the 
> correctness of the code. 
> What guarantees are there?
> Does a certain reference need a volatile or even a synchronisation or not.
> What's the design for threading?
> If the intended design is not written down then we have to infer it from the 
> code and we will definitiely come to different conclusions and thus bugs and 
> leaks. and other avoidable horrors.
> It's really hard writing good MT code and a strong design is necessary to 
> provide a framework for the code.
> Some info here 
> https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading
>  , but this isn't a design and doesn't say how it's meant to work. However 
> that page does agree that writing MT code is very hard and this just 
> underlines the need for a strong and detailed design for this aspect. 
> ==
> Another supporting example. 
> When I see code like this ...
> FileOutputFormat
>     
> {code:java}
>    public void close() throws IOException {
>               final FSDataOutputStream s = this.stream;
>               if (s != null) {
>                       this.stream = null;
>                       s.close();
>               }
>       }
> {code}
> My feeling is that someone else wasn't sure what the right approach was.
> I can only guess that the author was concerned that someone else was going to 
> call the function concurrently, or mess with the class state by some other 
> means. And, if that were true then would this code even be MT safe - who 
> knows? Ought there be a volatile in there or a plain old sync? Or perhaps 
> none of the caution is needed at all (framework guarantees preventing the 
> need)?
> Or take a look at the extensive sychronisation efforts in 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
>  is this code correct? Not to mention that fact that this close() method 
> might throw an NPE if there is any possiblity that 'this.outputCommitter' 
> might not have been initialised in open OR is the framework can ever call 
> close()  without open() having completed.
> I find if worrying that I see a lot of code in the project that is similarly 
> uncertain and inconsistent syncronisation and resource management.
> I would have hoped that the underlying core framework provided guarantees 
> that avoided the need to have extensive synchronisation effort in derived or 
> auxiliary classes.
> What's the design.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to