Repository: spark Updated Branches: refs/heads/master 3fb09afd5 -> 2c95e4e96
[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario ## What changes were proposed in this pull request? When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created. This will be happened when playing streaming dynamic allocation with direct Kafka. ## How was this patch tested? Local integrated test is done. Author: jerryshao <ss...@hortonworks.com> Closes #12236 from jerryshao/SPARK-14455. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c95e4e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c95e4e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c95e4e9 Branch: refs/heads/master Commit: 2c95e4e966b90d2a315350608d4b21b0381dfd11 Parents: 3fb09af Author: jerryshao <ss...@hortonworks.com> Authored: Sat Apr 9 23:34:14 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Sat Apr 9 23:34:14 2016 -0700 ---------------------------------------------------------------------- .../streaming/scheduler/ReceiverTracker.scala | 12 +++++++--- .../scheduler/ReceiverTrackerSuite.scala | 23 +++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c95e4e9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index d67f707..3b33a97 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * Get the executors allocated to each receiver. * @return a map containing receiver ids to optional executor ids. */ - def allocatedExecutors(): Map[Int, Option[String]] = { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { - _.runningExecutor.map { _.executorId } + def allocatedExecutors(): Map[Int, Option[String]] = synchronized { + if (isTrackerStarted) { + endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + _.runningExecutor.map { + _.executorId + } + } + } else { + Map.empty } } http://git-wip-us.apache.org/repos/asf/spark/blob/2c95e4e9/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 7654bb2..df122ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver._ /** Testsuite for receiver scheduling */ @@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("get allocated executors") { + // Test get allocated executors when 1 receiver is registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val input = ssc.receiverStream(new TestReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) + } + + // Test get allocated executors when there's no receiver registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val rdd = ssc.sc.parallelize(1 to 10) + val input = new ConstantInputDStream(ssc, rdd) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) + } + } } /** An input DStream with for testing rate controlling */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org