[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662986#comment-15662986
 ] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2789#discussion_r87742870
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -279,32 +152,37 @@ public void run() {
                                                        break;
                                                }
     
    -                                           // emit the actual record. this 
also update offset state atomically
    +                                           // emit the actual record. this 
also updates offset state atomically
                                                // and deals with timestamps 
and watermark generation
                                                emitRecord(value, partition, 
record.offset(), record);
                                        }
                                }
                        }
    -                   // end main fetch loop
    -           }
    -           catch (Throwable t) {
    -                   if (running) {
    -                           running = false;
    -                           errorHandler.reportError(t);
    -                   } else {
    -                           LOG.debug("Stopped ConsumerThread threw 
exception", t);
    -                   }
                }
                finally {
    -                   try {
    -                           consumer.close();
    -                   }
    -                   catch (Throwable t) {
    -                           LOG.warn("Error while closing Kafka 0.9 
consumer", t);
    -                   }
    +                   // this signals the consumer thread that no more work 
is to be done
    +                   consumerThread.shutdown();
    +           }
    +
    +           // on a clean exit, wait for the runner thread
    +           try {
    +                   consumerThread.join();
    +           }
    +           catch (InterruptedException e) {
    +                   // may be the result of a wake-up interruption after an 
exception.
    +                   // we ignore this here and only restore the 
interruption state
    +                   Thread.currentThread().interrupt();
                }
        }
     
    +   @Override
    +   public void cancel() {
    +           // flag the main thread to exit. A thread interrupt will come 
anyways.
    +           running = false;
    +           handover.close();
    --- End diff --
    
    We might not need to call `close()` on the handover here. Please see my 
above comments.


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to