Repository: spark Updated Branches: refs/heads/branch-2.0 4131623a8 -> e8923d21d
[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD ## What changes were proposed in this pull request? The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`. ## How was this patch tested? Manual verification. Author: jerryshao <[email protected]> Closes #15545 from jerryshao/SPARK-17999. (cherry picked from commit 947f4f25273161dc4719419a35613a71c2e2a150) Signed-off-by: Shixiong Zhu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8923d21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8923d21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8923d21 Branch: refs/heads/branch-2.0 Commit: e8923d21dd9f230e0ac23582033442e6fe476611 Parents: 4131623 Author: jerryshao <[email protected]> Authored: Thu Oct 20 10:50:34 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Thu Oct 20 10:50:42 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e8923d21/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 496af7e..802dd04 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD( buf.toArray } + override def getPreferredLocations(split: Partition): Seq[String] = { + val part = split.asInstanceOf[KafkaSourceRDDPartition] + part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty) + } + override def compute( thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
