[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-9630:
-------------------------------
    Description: 
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 

  was:
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
    // set ourselves as not running;
    // this would let the main discovery loop escape as soon as possible
    running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null) {
            // we cannot close the discoverer here, as it is error-prone to 
concurrent access;
            // only wakeup the discoverer, the discovery loop will clean itself 
up after it escapes
            partitionDiscoverer.wakeup();
        }

    // the discovery loop may currently be sleeping in-between
    // consecutive discoveries; interrupt to shutdown faster
    discoveryLoopThread.interrupt();
    }

    // abort the fetcher, if there is one
    if (kafkaFetcher != null) {
         kafkaFetcher.cancel();
    }
}


I tried to fix it by catching *TopicAuthorizationException* in ** 
Kafka09PartitionDiscoverer.getAllPartitionsForTopics(), and close the 
kafkaConsumer. Which has been verified working.

So I'd like to take this issue.

> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-9630
>                 URL: https://issues.apache.org/jira/browse/FLINK-9630
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.0, 1.4.2
>         Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>            Reporter: Youjun Yuan
>            Priority: Major
>             Fix For: 1.5.1
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that as free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snaptshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not event 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 
> loop will clean itself up after it escapes             
> partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to