Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4131623a8 -> e8923d21d


[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD

## What changes were proposed in this pull request?

The newly implemented Structured Streaming `KafkaSource` did calculate the 
preferred locations for each topic partition, but didn't offer this information 
through RDD's `getPreferredLocations` method. So here propose to add this 
method in `KafkaSourceRDD`.

## How was this patch tested?

Manual verification.

Author: jerryshao <[email protected]>

Closes #15545 from jerryshao/SPARK-17999.

(cherry picked from commit 947f4f25273161dc4719419a35613a71c2e2a150)
Signed-off-by: Shixiong Zhu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8923d21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8923d21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8923d21

Branch: refs/heads/branch-2.0
Commit: e8923d21dd9f230e0ac23582033442e6fe476611
Parents: 4131623
Author: jerryshao <[email protected]>
Authored: Thu Oct 20 10:50:34 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Thu Oct 20 10:50:42 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e8923d21/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 496af7e..802dd04 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
     buf.toArray
   }
 
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val part = split.asInstanceOf[KafkaSourceRDDPartition]
+    part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
+  }
+
   override def compute(
       thePart: Partition,
       context: TaskContext): Iterator[ConsumerRecord[Array[Byte], 
Array[Byte]]] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to