[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657219#comment-15657219
 ] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2789

    [FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations

    **NOTE:** Only the second commit is relevant, the first commit only 
prepares by cleaning up some code in the Flink Kafka Consumers for 0.9 and 0.10
    
    ## Rational
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer. That thread was shielded from 
interrupts, because the Kafka Consumer has not been handling thread interrupts 
well.
    
    Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators. The later case 
lead to situations where cancellations got very slow unless that thread would 
be interrupted (which it could not be).
    
    ## Core changes
    
    This commit changes the thread model:
    
      - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the batch of records into a sort of blocking queue
      - The main thread of the task will pull the record batches from the 
blocking queue and emit the records.
    
    The "batches" are the fetch batches from Kafka's consumer, there is no 
additional buffering or so that would impact latency.
    
    The thread-to-thread handover of the records batches is handled by a class 
`Handover` which is a size-one blocking queue with the additional ability to 
gracefully wake up the consumer thread if the main thread decided to shut down. 
That way we need no interrupts on the KafkaConsumerThread.
    
    This also pulls the KafkaConsumerThread out of the fetcher class for some 
code cleanup (scope simplifications).
    The method calls that were broken between Kafka 0.9 and 0.10 are handled 
via a "call bridge", which leads to fewer code changes in the fetchers for each 
method that needs to be adapted.
    
    ## Tests
    
    This adjusts some tests, but it removes the "short retention IT Cases" for 
Kafka 0.9 and 0.10 consumers.
    While that type of test makes sense for the 0.8 consumer, for the newer 
ones the tests actually test purely Kafka and no Flink code.
    
    In addition, they are virtually impossible to run stable and fast, because 
they rely on an artificial slowdown in the KafkaConsumer threads. That type of 
unhealthy interference is exactly what this patch here prevents ;-)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_consumer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2789
    
----
commit f6cd417cdf37213f88c62e9342206e249402eac6
Author: Stephan Ewen <[email protected]>
Date:   2016-11-09T16:58:54Z

    [hotfix] [Kafka Consumer] Clean up some code confusion and style in the 
Fetchers for Kafka 0.9/0.10

commit 9a0786508b9a13cd986de593c6bdb2ecdb1737a8
Author: Stephan Ewen <[email protected]>
Date:   2016-11-10T10:13:43Z

    [FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer.
    That thread ws shielded from interrupts, because the Kafka Consumer has not 
been handling thread interrupts well.
    Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators.
    The later case lead to situations where cancellations got very slow unless 
that thread would be interrupted (which it could not be).
    
    This commit changes the thread model:
      - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the
        batch of records into a blocking queue (size one)
      - The main thread of the task will pull the record batches from the 
blocking queue and
        emit the records.

----


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to