[ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated SPARK-24987:
------------------------------------
    Environment: 
Spark 2.3.1

Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
 Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

 

  was:
Spark 2.3.1

Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
 Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

Spark graph:
{code:java}
kafkaStream
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .flatMap {...}
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
  .foreach(...)
  .outputMode(OutputMode.Update)
  .option("checkpointLocation",
sparkConfiguration.properties.checkpointDirectory)
  .start()
  .awaitTermination()
{code}


> Kafka Cached Consumer Leaking Consumers
> ---------------------------------------
>
>                 Key: SPARK-24987
>                 URL: https://issues.apache.org/jira/browse/SPARK-24987
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>            Reporter: Yuval Itzchakov
>            Priority: Critical
>
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task, perhaps the registered callback on 
> the context is never been called (just a theory, no hard evidence):
> {code:java}
> context.addTaskCompletionListener { _ => underlying.closeIfNeeded() }
> {code}
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to