Yuval Itzchakov created SPARK-24987:
---------------------------------------
Summary: Kafka Cached Consumer Leaking Consumers
Key: SPARK-24987
URL: https://issues.apache.org/jira/browse/SPARK-24987
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.3.1, 2.3.0
Environment: Spark 2.3.1
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
Spark graph:
```scala
kafkaStream
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.flatMap \{...}
.groupByKey(...)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
.foreach(...)
.outputMode(OutputMode.Update)
.option("checkpointLocation",
sparkConfiguration.properties.checkpointDirectory)
.start()
.awaitTermination()
```
Reporter: Yuval Itzchakov
Spark 2.3.0 introduced a new mechanism for caching Kafka consumers
(https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
via KafkaDataConsumer.acquire.
It seems that there are situations (I've been trying to debug it, haven't been
able to find the root cause as of yet) where cached consumers remain "in use"
throughout the life time of the task, perhaps the registered callback on the
context is never been called (just a theory, no hard evidence):
```scala
context.addTaskCompletionListener { _ =>
underlying.closeIfNeeded()
}
```
I've traced down this leak using file leak detector, attaching it to the
running Executor JVM process. I've emitted the list of open file descriptors
which [you can find
here](https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d),
and you can see that the majority of them are epoll FD used by Kafka Consumers,
indicating that they aren't closing.
The number of open FD increases over time and is not immediate, but you can
clearly see the amount of descriptors grow over time. This is a snapshot after
running the load test for about 5 hours:
!image-2018-08-01-13-13-16-339.png!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]