GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2789

    [FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations

    **NOTE:** Only the second commit is relevant, the first commit only 
prepares by cleaning up some code in the Flink Kafka Consumers for 0.9 and 0.10
    
    ## Rational
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer. That thread was shielded from 
interrupts, because the Kafka Consumer has not been handling thread interrupts 
well.
    
    Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators. The later case 
lead to situations where cancellations got very slow unless that thread would 
be interrupted (which it could not be).
    
    ## Core changes
    
    This commit changes the thread model:
    
      - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the batch of records into a sort of blocking queue
      - The main thread of the task will pull the record batches from the 
blocking queue and emit the records.
    
    The "batches" are the fetch batches from Kafka's consumer, there is no 
additional buffering or so that would impact latency.
    
    The thread-to-thread handover of the records batches is handled by a class 
`Handover` which is a size-one blocking queue with the additional ability to 
gracefully wake up the consumer thread if the main thread decided to shut down. 
That way we need no interrupts on the KafkaConsumerThread.
    
    This also pulls the KafkaConsumerThread out of the fetcher class for some 
code cleanup (scope simplifications).
    The method calls that were broken between Kafka 0.9 and 0.10 are handled 
via a "call bridge", which leads to fewer code changes in the fetchers for each 
method that needs to be adapted.
    
    ## Tests
    
    This adjusts some tests, but it removes the "short retention IT Cases" for 
Kafka 0.9 and 0.10 consumers.
    While that type of test makes sense for the 0.8 consumer, for the newer 
ones the tests actually test purely Kafka and no Flink code.
    
    In addition, they are virtually impossible to run stable and fast, because 
they rely on an artificial slowdown in the KafkaConsumer threads. That type of 
unhealthy interference is exactly what this patch here prevents ;-)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_consumer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2789
    
----
commit f6cd417cdf37213f88c62e9342206e249402eac6
Author: Stephan Ewen <[email protected]>
Date:   2016-11-09T16:58:54Z

    [hotfix] [Kafka Consumer] Clean up some code confusion and style in the 
Fetchers for Kafka 0.9/0.10

commit 9a0786508b9a13cd986de593c6bdb2ecdb1737a8
Author: Stephan Ewen <[email protected]>
Date:   2016-11-10T10:13:43Z

    [FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer.
    That thread ws shielded from interrupts, because the Kafka Consumer has not 
been handling thread interrupts well.
    Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators.
    The later case lead to situations where cancellations got very slow unless 
that thread would be interrupted (which it could not be).
    
    This commit changes the thread model:
      - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the
        batch of records into a blocking queue (size one)
      - The main thread of the task will pull the record batches from the 
blocking queue and
        emit the records.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to