Repository: spark
Updated Branches:
  refs/heads/master 596681794 -> 67582132b


[SPARK-11063] [STREAMING] Change preferredLocations of Receiver's RDD to hosts 
rather than hostports

The format of RDD's preferredLocations must be hostname but the format of 
Streaming Receiver's scheduling executors is hostport. So it doesn't work.

This PR converts `schedulerExecutors` to `hosts` before creating Receiver's RDD.

Author: zsxwing <[email protected]>

Closes #9075 from zsxwing/SPARK-11063.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67582132
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67582132
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67582132

Branch: refs/heads/master
Commit: 67582132bffbaaeaadc5cf8218f6239d03c39da0
Parents: 5966817
Author: zsxwing <[email protected]>
Authored: Mon Oct 19 15:35:14 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon Oct 19 15:35:14 2015 -0700

----------------------------------------------------------------------
 .../scheduler/ReceiverSchedulingPolicy.scala    |  3 ++-
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +++-
 .../scheduler/ReceiverTrackerSuite.scala        | 24 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67582132/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 10b5a7f..d2b0be7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -21,6 +21,7 @@ import scala.collection.Map
 import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 /**
  * A class that tries to schedule receivers with evenly distributed. There are 
two phases for
@@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy {
       return receivers.map(_.streamId -> Seq.empty).toMap
     }
 
-    val hostToExecutors = executors.groupBy(_.split(":")(0))
+    val hostToExecutors = executors.groupBy(executor => 
Utils.parseHostPort(executor)._1)
     val scheduledExecutors = Array.fill(receivers.length)(new 
mutable.ArrayBuffer[String])
     val numReceiversOnExecutor = mutable.HashMap[String, Int]()
     // Set the initial value to 0

http://git-wip-us.apache.org/repos/asf/spark/blob/67582132/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index d053e9e..2ce80d6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         if (scheduledExecutors.isEmpty) {
           ssc.sc.makeRDD(Seq(receiver), 1)
         } else {
-          ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
+          val preferredLocations =
+            scheduledExecutors.map(hostPort => 
Utils.parseHostPort(hostPort)._1).distinct
+          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
         }
       receiverRDD.setName(s"Receiver $receiverId")
       ssc.sparkContext.setJobDescription(s"Streaming job running receiver 
$receiverId")

http://git-wip-us.apache.org/repos/asf/spark/blob/67582132/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 45138b7..fda86ae 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, 
TaskLocality}
+import org.apache.spark.scheduler.TaskLocality.TaskLocality
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       }
     }
   }
+
+  test("SPARK-11063: TaskSetManager should use Receiver RDD's 
preferredLocations") {
+    // Use ManualClock to prevent from starting batches so that we can make 
sure the only task is
+    // for starting the Receiver
+    val _conf = conf.clone.set("spark.streaming.clock", 
"org.apache.spark.util.ManualClock")
+    withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc 
=>
+      @volatile var receiverTaskLocality: TaskLocality = null
+      ssc.sparkContext.addSparkListener(new SparkListener {
+        override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+          receiverTaskLocality = taskStart.taskInfo.taskLocality
+        }
+      })
+      val input = ssc.receiverStream(new TestReceiver)
+      val output = new TestOutputStream(input)
+      output.register()
+      ssc.start()
+      eventually(timeout(10 seconds), interval(10 millis)) {
+        // If preferredLocations is set correctly, receiverTaskLocality should 
be NODE_LOCAL
+        assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+      }
+    }
+  }
 }
 
 /** An input DStream with for testing rate controlling */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to