This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7142e04639c [SPARK-44248][SS][SQL][KAFKA] Add preferred location in 
kafka source v2
7142e04639c is described below

commit 7142e04639c1481e41ad499e657b2c62120fe763
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Fri Jun 30 06:50:24 2023 +0900

    [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2
    
    ### What changes were proposed in this pull request?
    In KafkaBatchInputPartition, which is used for Kafka v2 source, 
preferredLocations() is now returned from the location already pre-calculated.
    
    ### Why are the changes needed?
    DSv2 Kafka streaming source seems to miss setting the preferred location, 
which may destroy the purpose of cache for Kafka consumer (connection) & 
fetched data. For DSv1, we have set the preferred location in RDD. This 
information is not returned in DSv2.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Some manual verification.
    
    Closes #41790 from siying/kafkav2loc.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 508f5c7036b..97c8592d1da 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -34,7 +34,11 @@ private[kafka010] case class KafkaBatchInputPartition(
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    includeHeaders: Boolean) extends InputPartition
+    includeHeaders: Boolean) extends InputPartition {
+  override def preferredLocations(): Array[String] = {
+    offsetRange.preferredLoc.map(Array(_)).getOrElse(Array())
+  }
+}
 
 private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to