Repository: spark
Updated Branches:
  refs/heads/master 3fb09afd5 -> 2c95e4e96


[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in 
receiver-less scenario

## What changes were proposed in this pull request?

When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, 
NPE will be thrown, since this `ReceiverTracker` actually is not started and 
`endpoint` is not created.

This will be happened when playing streaming dynamic allocation with direct 
Kafka.

## How was this patch tested?

Local integrated test is done.

Author: jerryshao <ss...@hortonworks.com>

Closes #12236 from jerryshao/SPARK-14455.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c95e4e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c95e4e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c95e4e9

Branch: refs/heads/master
Commit: 2c95e4e966b90d2a315350608d4b21b0381dfd11
Parents: 3fb09af
Author: jerryshao <ss...@hortonworks.com>
Authored: Sat Apr 9 23:34:14 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Sat Apr 9 23:34:14 2016 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/ReceiverTracker.scala   | 12 +++++++---
 .../scheduler/ReceiverTrackerSuite.scala        | 23 +++++++++++++++++++-
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c95e4e9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index d67f707..3b33a97 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
    * Get the executors allocated to each receiver.
    * @return a map containing receiver ids to optional executor ids.
    */
-  def allocatedExecutors(): Map[Int, Option[String]] = {
-    endpoint.askWithRetry[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
-      _.runningExecutor.map { _.executorId }
+  def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
+    if (isTrackerStarted) {
+      endpoint.askWithRetry[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+        _.runningExecutor.map {
+          _.executorId
+        }
+      }
+    } else {
+      Map.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c95e4e9/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 7654bb2..df122ac 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerTaskStart, TaskLo
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{ConstantInputDStream, 
ReceiverInputDStream}
 import org.apache.spark.streaming.receiver._
 
 /** Testsuite for receiver scheduling */
@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       }
     }
   }
+
+  test("get allocated executors") {
+    // Test get allocated executors when 1 receiver is registered
+    withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+      val input = ssc.receiverStream(new TestReceiver)
+      val output = new TestOutputStream(input)
+      output.register()
+      ssc.start()
+      assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1)
+    }
+
+    // Test get allocated executors when there's no receiver registered
+    withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+      val rdd = ssc.sc.parallelize(1 to 10)
+      val input = new ConstantInputDStream(ssc, rdd)
+      val output = new TestOutputStream(input)
+      output.register()
+      ssc.start()
+      assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty)
+    }
+  }
 }
 
 /** An input DStream with for testing rate controlling */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to