[ 
https://issues.apache.org/jira/browse/SPARK-21873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated SPARK-21873:
------------------------------------
    Description: 
In Scala, using `return` inside a function causes a `NonLocalReturnControl` 
exception to be thrown and caught in order to escape the current scope.

While profiling Structured Streaming in production, it clearly shows:

!https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png!

This happens during a 1 minute profiling session on a single executor. The code 
is:

{code:java}
while (toFetchOffset != UNKNOWN_OFFSET) {
      try {
        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
      } catch {
        case e: OffsetOutOfRangeException =>
          // When there is some error thrown, it's better to use a new consumer 
to drop all cached
          // states in the old consumer. We don't need to worry about the 
performance because this
          // is not a common path.
          resetConsumer()
          reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", 
e)
          toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
      }
    }
{code}

This happens because this method is converted to a function which is ran inside:

{code:java}
private def runUninterruptiblyIfPossible[T](body: => T): T
{code}

We should avoid using `return` in general, and here specifically as it is a hot 
path for applications using Kafka.


  was:
In Scala, using `return` inside a function causes a `NonLocalReturnControl` 
exception to be thrown and caught in order to escape the current scope.

While profiling Structured Streaming in production, it clearly shows:

!https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png!

This happens during a 1 minute profiling session on a single executor. The code 
is:

{code:scala}
while (toFetchOffset != UNKNOWN_OFFSET) {
      try {
        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
      } catch {
        case e: OffsetOutOfRangeException =>
          // When there is some error thrown, it's better to use a new consumer 
to drop all cached
          // states in the old consumer. We don't need to worry about the 
performance because this
          // is not a common path.
          resetConsumer()
          reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", 
e)
          toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
      }
    }
{code}

This happens because this method is converted to a function which is ran inside:

{code:scala}
private def runUninterruptiblyIfPossible[T](body: => T): T
{code}

We should avoid using `return` in general, and here specifically as it is a hot 
path for applications using Kafka.



> CachedKafkaConsumer throws NonLocalReturnControl during fetching from Kafka
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-21873
>                 URL: https://issues.apache.org/jira/browse/SPARK-21873
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.1.1, 2.2.0
>            Reporter: Yuval Itzchakov
>            Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> In Scala, using `return` inside a function causes a `NonLocalReturnControl` 
> exception to be thrown and caught in order to escape the current scope.
> While profiling Structured Streaming in production, it clearly shows:
> !https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png!
> This happens during a 1 minute profiling session on a single executor. The 
> code is:
> {code:java}
> while (toFetchOffset != UNKNOWN_OFFSET) {
>       try {
>         return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
> failOnDataLoss)
>       } catch {
>         case e: OffsetOutOfRangeException =>
>           // When there is some error thrown, it's better to use a new 
> consumer to drop all cached
>           // states in the old consumer. We don't need to worry about the 
> performance because this
>           // is not a common path.
>           resetConsumer()
>           reportDataLoss(failOnDataLoss, s"Cannot fetch offset 
> $toFetchOffset", e)
>           toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
> untilOffset)
>       }
>     }
> {code}
> This happens because this method is converted to a function which is ran 
> inside:
> {code:java}
> private def runUninterruptiblyIfPossible[T](body: => T): T
> {code}
> We should avoid using `return` in general, and here specifically as it is a 
> hot path for applications using Kafka.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to