Repository: spark Updated Branches: refs/heads/master d4895c9de -> 8f0df6bc1
[SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get` During profiling of a structured streaming application with Kafka as the source, I came across this exception:  This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions to be thrown. This happens because `CachedKafkaConsumer.get` is ran inside: `private def runUninterruptiblyIfPossible[T](body: => T): T` Where `body: => T` is the `get` method. Turning the method into a function means that in order to escape the `while` loop defined in `get` the runtime has to do dirty tricks which involve throwing the above exception. ## What changes were proposed in this pull request? Instead of using `return` (which is generally not recommended in Scala), we place the result of the `fetchData` method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the `while` loop. ## How was this patch tested? I've ran the `KafkaSourceSuite` to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite. Author: Yuval Itzchakov <[email protected]> Closes #19059 from YuvalItzchakov/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0df6bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0df6bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0df6bc Branch: refs/heads/master Commit: 8f0df6bc1092c0c75b41e91e4ffc41a5525c8274 Parents: d4895c9 Author: Yuval Itzchakov <[email protected]> Authored: Wed Aug 30 10:33:23 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Wed Aug 30 10:33:23 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/kafka010/CachedKafkaConsumer.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8f0df6bc/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 7c4f38e..90ed7b1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private( // we will move to the next available offset within `[offset, untilOffset)` and retry. // If `failOnDataLoss` is `true`, the loop body will be executed only once. var toFetchOffset = offset - while (toFetchOffset != UNKNOWN_OFFSET) { + var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null + // We want to break out of the while loop on a successful fetch to avoid using "return" + // which may causes a NonLocalReturnControl exception when this method is used as a function. + var isFetchComplete = false + + while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { - return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + isFetchComplete = true } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private( toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) } } - resetFetchedData() - null + + if (isFetchComplete) { + consumerRecord + } else { + resetFetchedData() + null + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
