Repository: spark
Updated Branches:
  refs/heads/master d4895c9de -> 8f0df6bc1


[SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

During profiling of a structured streaming application with Kafka as the 
source, I came across this exception:

![Structured Streaming Kafka 
Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png)

This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions 
to be thrown.
This happens because `CachedKafkaConsumer.get` is ran inside:

`private def runUninterruptiblyIfPossible[T](body: => T): T`

Where `body: => T` is the `get` method. Turning the method into a function 
means that in order to escape the `while` loop defined in `get` the runtime has 
to do dirty tricks which involve throwing the above exception.

## What changes were proposed in this pull request?

Instead of using `return` (which is generally not recommended in Scala), we 
place the result of the `fetchData` method inside a local variable and use a 
boolean flag to indicate the status of fetching data, which we monitor as our 
predicate to the `while` loop.

## How was this patch tested?

I've ran the `KafkaSourceSuite` to make sure regression passes. Since the 
exception isn't visible from user code, there is no way (at least that I could 
think of) to add this as a test to the existing suite.

Author: Yuval Itzchakov <[email protected]>

Closes #19059 from YuvalItzchakov/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0df6bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0df6bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0df6bc

Branch: refs/heads/master
Commit: 8f0df6bc1092c0c75b41e91e4ffc41a5525c8274
Parents: d4895c9
Author: Yuval Itzchakov <[email protected]>
Authored: Wed Aug 30 10:33:23 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Wed Aug 30 10:33:23 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/kafka010/CachedKafkaConsumer.scala | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f0df6bc/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 7c4f38e..90ed7b1 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private(
     // we will move to the next available offset within `[offset, 
untilOffset)` and retry.
     // If `failOnDataLoss` is `true`, the loop body will be executed only once.
     var toFetchOffset = offset
-    while (toFetchOffset != UNKNOWN_OFFSET) {
+    var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    // We want to break out of the while loop on a successful fetch to avoid 
using "return"
+    // which may causes a NonLocalReturnControl exception when this method is 
used as a function.
+    var isFetchComplete = false
+
+    while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
       try {
-        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+        consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+        isFetchComplete = true
       } catch {
         case e: OffsetOutOfRangeException =>
           // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private(
           toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
       }
     }
-    resetFetchedData()
-    null
+
+    if (isFetchComplete) {
+      consumerRecord
+    } else {
+      resetFetchedData()
+      null
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to