[ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-24987:
---------------------------------
    Affects Version/s:     (was: 2.3.0)

> Kafka Cached Consumer Leaking File Descriptors
> ----------------------------------------------
>
>                 Key: SPARK-24987
>                 URL: https://issues.apache.org/jira/browse/SPARK-24987
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>         Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>            Reporter: Yuval Itzchakov
>            Assignee: Yuval Itzchakov
>            Priority: Critical
>             Fix For: 2.3.2, 2.4.0
>
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to