Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0be6e3b3e -> 130ec219a


[SPARK-7788] Made KinesisReceiver.onStart() non-blocking

KinesisReceiver calls worker.run() which is a blocking call (while loop) as per 
source code of kinesis-client library - 
https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling 
sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) 
perhaps because ReceiverTracker is never able to register the receiver (it's 
receiverInfo field is a empty map) causing it to be stuck in infinite loop 
while waiting for running flag to be set to false.

Author: Tathagata Das <[email protected]>

Closes #6348 from tdas/SPARK-7788 and squashes the following commits:

2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking

(cherry picked from commit 1c388a9985999e043fa002618a357bc8f0a8b65a)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/130ec219
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/130ec219
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/130ec219

Branch: refs/heads/branch-1.4
Commit: 130ec219aa40cd8cebf4105053d4c92d840e127e
Parents: 0be6e3b
Author: Tathagata Das <[email protected]>
Authored: Fri May 22 17:39:01 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Fri May 22 17:39:09 2015 -0700

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisReceiver.scala     | 30 ++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/130ec219/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 800202e..7dd8bfd 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis
 
 import java.util.UUID
 
+import scala.util.control.NonFatal
+
 import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
 import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorFactory}
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
@@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver(
    */
   private var worker: Worker = null
 
+  /** Thread running the worker */
+  private var workerThread: Thread = null
+
   /**
    * This is called when the KinesisReceiver starts and must be non-blocking.
    * The KCL creates and manages the receiving/processing thread pool through 
Worker.run().
@@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver(
     }
 
     worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
-    worker.run()
-
+    workerThread = new Thread() {
+      override def run(): Unit = {
+        try {
+          worker.run()
+        } catch {
+          case NonFatal(e) =>
+            restart("Error running the KCL worker in Receiver", e)
+        }
+      }
+    }
+    workerThread.setName(s"Kinesis Receiver ${streamId}")
+    workerThread.setDaemon(true)
+    workerThread.start()
     logInfo(s"Started receiver with workerId $workerId")
   }
 
@@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver(
    * The KCL will do its best to drain and checkpoint any in-flight records 
upon shutdown.
    */
   override def onStop() {
-    if (worker != null) {
-      worker.shutdown()
+    if (workerThread != null) {
+      if (worker != null) {
+        worker.shutdown()
+        worker = null
+      }
+      workerThread.join()
+      workerThread = null
       logInfo(s"Stopped receiver for workerId $workerId")
-      worker = null
     }
     workerId = null
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to