[
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657219#comment-15657219
]
ASF GitHub Bot commented on FLINK-5048:
---------------------------------------
GitHub user StephanEwen opened a pull request:
https://github.com/apache/flink/pull/2789
[FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to
better handel shutdown/interrupt situations
**NOTE:** Only the second commit is relevant, the first commit only
prepares by cleaning up some code in the Flink Kafka Consumers for 0.9 and 0.10
## Rational
Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate
thread that operates Kafka's consumer. That thread was shielded from
interrupts, because the Kafka Consumer has not been handling thread interrupts
well.
Since that thread was also the thread that emitted records, it would block
in the network stack (backpressure) or in chained operators. The later case
lead to situations where cancellations got very slow unless that thread would
be interrupted (which it could not be).
## Core changes
This commit changes the thread model:
- A spawned consumer thread polls a batch or records from the
KafkaConsumer and pushes the batch of records into a sort of blocking queue
- The main thread of the task will pull the record batches from the
blocking queue and emit the records.
The "batches" are the fetch batches from Kafka's consumer, there is no
additional buffering or so that would impact latency.
The thread-to-thread handover of the records batches is handled by a class
`Handover` which is a size-one blocking queue with the additional ability to
gracefully wake up the consumer thread if the main thread decided to shut down.
That way we need no interrupts on the KafkaConsumerThread.
This also pulls the KafkaConsumerThread out of the fetcher class for some
code cleanup (scope simplifications).
The method calls that were broken between Kafka 0.9 and 0.10 are handled
via a "call bridge", which leads to fewer code changes in the fetchers for each
method that needs to be adapted.
## Tests
This adjusts some tests, but it removes the "short retention IT Cases" for
Kafka 0.9 and 0.10 consumers.
While that type of test makes sense for the 0.8 consumer, for the newer
ones the tests actually test purely Kafka and no Flink code.
In addition, they are virtually impossible to run stable and fast, because
they rely on an artificial slowdown in the KafkaConsumer threads. That type of
unhealthy interference is exactly what this patch here prevents ;-)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StephanEwen/incubator-flink kafka_consumer
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2789.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2789
----
commit f6cd417cdf37213f88c62e9342206e249402eac6
Author: Stephan Ewen <[email protected]>
Date: 2016-11-09T16:58:54Z
[hotfix] [Kafka Consumer] Clean up some code confusion and style in the
Fetchers for Kafka 0.9/0.10
commit 9a0786508b9a13cd986de593c6bdb2ecdb1737a8
Author: Stephan Ewen <[email protected]>
Date: 2016-11-10T10:13:43Z
[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to
better handel shutdown/interrupt situations
Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate
thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not
been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block
in the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless
that thread would be interrupted (which it could not be).
This commit changes the thread model:
- A spawned consumer thread polls a batch or records from the
KafkaConsumer and pushes the
batch of records into a blocking queue (size one)
- The main thread of the task will pull the record batches from the
blocking queue and
emit the records.
----
> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation
> behavior
> ---------------------------------------------------------------------------------
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that
> operates the KafkaConsumer. That thread is shielded from interrupts, because
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the
> network stack (backpressure) or in chained operators. The later case leads to
> situations where cancellations get very slow unless that thread would be
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
> - A spawned consumer thread pull from the KafkaConsumer and pushes its
> pulled batch of records into a blocking queue (size one)
> - The main thread of the task will pull the record batches from the
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the
> additional memory consumption - only two batches are ever held, one being
> fetched and one being emitted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)