[
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662986#comment-15662986
]
ASF GitHub Bot commented on FLINK-5048:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2789#discussion_r87742870
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
@@ -279,32 +152,37 @@ public void run() {
break;
}
- // emit the actual record. this
also update offset state atomically
+ // emit the actual record. this
also updates offset state atomically
// and deals with timestamps
and watermark generation
emitRecord(value, partition,
record.offset(), record);
}
}
}
- // end main fetch loop
- }
- catch (Throwable t) {
- if (running) {
- running = false;
- errorHandler.reportError(t);
- } else {
- LOG.debug("Stopped ConsumerThread threw
exception", t);
- }
}
finally {
- try {
- consumer.close();
- }
- catch (Throwable t) {
- LOG.warn("Error while closing Kafka 0.9
consumer", t);
- }
+ // this signals the consumer thread that no more work
is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an
exception.
+ // we ignore this here and only restore the
interruption state
+ Thread.currentThread().interrupt();
}
}
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come
anyways.
+ running = false;
+ handover.close();
--- End diff --
We might not need to call `close()` on the handover here. Please see my
above comments.
> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation
> behavior
> ---------------------------------------------------------------------------------
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that
> operates the KafkaConsumer. That thread is shielded from interrupts, because
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the
> network stack (backpressure) or in chained operators. The later case leads to
> situations where cancellations get very slow unless that thread would be
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
> - A spawned consumer thread pull from the KafkaConsumer and pushes its
> pulled batch of records into a blocking queue (size one)
> - The main thread of the task will pull the record batches from the
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the
> additional memory consumption - only two batches are ever held, one being
> fetched and one being emitted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)