[ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated SPARK-24987:
------------------------------------
    Description: 
Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
(https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
 via KafkaDataConsumer.acquire.

It seems that there are situations (I've been trying to debug it, haven't been 
able to find the root cause as of yet) where cached consumers remain "in use" 
throughout the life time of the task and are never released. This can be 
identified by the following line of the stack trace:

 

 

I've traced down this leak using file leak detector, attaching it to the 
running Executor JVM process. I've emitted the list of open file descriptors 
which [you can find 
here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], 
and you can see that the majority of them are epoll FD used by Kafka Consumers, 
indicating that they aren't closing.

 Spark graph:
{code:java}
kafkaStream
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .flatMap {...}
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
  .foreach(...)
  .outputMode(OutputMode.Update)
  .option("checkpointLocation",
sparkConfiguration.properties.checkpointDirectory)
  .start()
  .awaitTermination(){code}

  was:
Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
(https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
 via KafkaDataConsumer.acquire.

It seems that there are situations (I've been trying to debug it, haven't been 
able to find the root cause as of yet) where cached consumers remain "in use" 
throughout the life time of the task, perhaps the registered callback on the 
context is never been called (just a theory, no hard evidence):
{code:java}
context.addTaskCompletionListener { _ => underlying.closeIfNeeded() }
{code}
I've traced down this leak using file leak detector, attaching it to the 
running Executor JVM process. I've emitted the list of open file descriptors 
which [you can find 
here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], 
and you can see that the majority of them are epoll FD used by Kafka Consumers, 
indicating that they aren't closing.

 Spark graph:
{code:java}
kafkaStream
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .flatMap {...}
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
  .foreach(...)
  .outputMode(OutputMode.Update)
  .option("checkpointLocation",
sparkConfiguration.properties.checkpointDirectory)
  .start()
  .awaitTermination(){code}


> Kafka Cached Consumer Leaking Consumers
> ---------------------------------------
>
>                 Key: SPARK-24987
>                 URL: https://issues.apache.org/jira/browse/SPARK-24987
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>            Reporter: Yuval Itzchakov
>            Priority: Critical
>
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
>  
>  
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to