Yuval Itzchakov created SPARK-21873:
---------------------------------------
Summary: CachedKafkaConsumer throws NonLocalReturnControl during
fetching from Kafka
Key: SPARK-21873
URL: https://issues.apache.org/jira/browse/SPARK-21873
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.2.0, 2.1.1, 2.1.0
Reporter: Yuval Itzchakov
Priority: Minor
In Scala, using `return` inside a function causes a `NonLocalReturnControl`
exception to be thrown and caught in order to escape the current scope.
While profiling Structured Streaming in production, it clearly shows:
!https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png!
This happens during a 1 minute profiling session on a single executor. The code
is:
{code:scala}
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
return fetchData(toFetchOffset, untilOffset, pollTimeoutMs,
failOnDataLoss)
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer
to drop all cached
// states in the old consumer. We don't need to worry about the
performance because this
// is not a common path.
resetConsumer()
reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset",
e)
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset,
untilOffset)
}
}
{code}
This happens because this method is converted to a function which is ran inside:
{code:scala}
private def runUninterruptiblyIfPossible[T](body: => T): T
{code}
We should avoid using `return` in general, and here specifically as it is a hot
path for applications using Kafka.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]