This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f6329422639 [SPARK-55561][SS] Add retries for all Kafka admin client 
methods
4f6329422639 is described below

commit 4f6329422639558c37d3b2ea56f35f3a98b1ea4c
Author: Kavpreet Grewal <[email protected]>
AuthorDate: Thu Feb 19 16:36:17 2026 -0800

    [SPARK-55561][SS] Add retries for all Kafka admin client methods
    
    ### What changes were proposed in this pull request?
    
    - Wraps the usage of the Kafka Admin client in 
`KafkaOffsetReaderAdmin.fetchPartitionOffsets` with `withRetries`
    - Updates `withRetries` to work with generic types rather than just 
`Map[TopicPartition, Long]`
    
    ### Why are the changes needed?
    
    In `KafkaOffsetReaderAdmin.fetchPartitionOffsets`, method calls using the 
admin client are not retried, unlike other calls with are wrapped with 
`withRetries`. These calls should also be retried so that we handle transient 
errors gracefully and do not immediately fail the stream.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added new unit tests and existing tests should continue to pass.
    
    Ran tests locally: `build/sbt "sql-kafka-0-10/testOnly 
org.apache.spark.sql.kafka010.KafkaOffsetReaderSuite"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #54339 from kavpreetgrewal/SPARK-55561-retries.
    
    Authored-by: Kavpreet Grewal <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      |  6 +--
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      | 61 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index ee674f34d9cb..69f346af19a8 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -127,7 +127,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       logDebug(s"Assigned partitions: $partitions. Seeking to 
$partitionOffsets")
       partitionOffsets
     }
-    val partitions = consumerStrategy.assignedTopicPartitions(admin)
+    val partitions = withRetries { 
consumerStrategy.assignedTopicPartitions(admin) }
     // Obtain TopicPartition offsets with late binding support
     offsetRangeLimit match {
       case EarliestOffsetRangeLimit => partitions.map {
@@ -455,9 +455,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
    * Retries are needed to handle transient failures. For e.g. race conditions 
between getting
    * assignment and getting position while topics/partitions are deleted can 
cause NPEs.
    */
-  private def withRetries(body: => Map[TopicPartition, Long]): 
Map[TopicPartition, Long] = {
+  private def withRetries[T](body: => T): T = {
     synchronized {
-      var result: Option[Map[TopicPartition, Long]] = None
+      var result: Option[T] = None
       var attempt = 1
       var lastException: Throwable = null
       while (result.isEmpty && attempt <= maxOffsetFetchAttempts
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index f50617996428..1674e5a312f0 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -22,8 +22,11 @@ import java.util.UUID
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, times, verify, when}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.QueryTest
@@ -263,4 +266,62 @@ class KafkaOffsetReaderSuite extends QueryTest with 
SharedSparkSession with Kafk
       }
     }
   }
+
+  private def createReaderWithMockedStrategy(
+      mockStrategy: ConsumerStrategy): KafkaOffsetReaderAdmin = {
+    new KafkaOffsetReaderAdmin(
+      mockStrategy,
+      KafkaSourceProvider.kafkaParamsForDriver(Map(
+        "bootstrap.servers" -> testUtils.brokerAddress
+      )),
+      CaseInsensitiveMap(Map(
+        KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY -> "3",
+        KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS -> "0"
+      )),
+      ""
+    )
+  }
+
+  test("SPARK-55561: fetchPartitionOffsets retries on transient failures") {
+    val tp0 = new TopicPartition("topic", 0)
+    val tp1 = new TopicPartition("topic", 1)
+    val expectedPartitions = Set(tp0, tp1)
+
+    val mockStrategy = mock(classOf[ConsumerStrategy])
+    val mockAdmin = mock(classOf[Admin])
+    when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
+    when(mockStrategy.assignedTopicPartitions(any()))
+      .thenThrow(new RuntimeException("Transient error"))
+      .thenThrow(new RuntimeException("Transient error"))
+      .thenReturn(expectedPartitions)
+
+    val reader = createReaderWithMockedStrategy(mockStrategy)
+    try {
+      val result = reader.fetchPartitionOffsets(
+        EarliestOffsetRangeLimit, isStartingOffsets = true)
+      assert(result === expectedPartitions.map(tp => tp -> 
KafkaOffsetRangeLimit.EARLIEST).toMap)
+      verify(mockStrategy, times(3)).assignedTopicPartitions(any())
+    } finally {
+      reader.close()
+    }
+  }
+
+  test("SPARK-55561: fetchPartitionOffsets throws after all retries 
exhausted") {
+    val mockStrategy = mock(classOf[ConsumerStrategy])
+    val mockAdmin = mock(classOf[Admin])
+    when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
+    when(mockStrategy.assignedTopicPartitions(any()))
+      .thenThrow(new RuntimeException("Persistent error"))
+
+    val reader = createReaderWithMockedStrategy(mockStrategy)
+    try {
+      val ex = intercept[RuntimeException] {
+        reader.fetchPartitionOffsets(EarliestOffsetRangeLimit, 
isStartingOffsets = true)
+      }
+      assert(ex.getMessage === "Persistent error")
+      verify(mockStrategy, times(3)).assignedTopicPartitions(any())
+    } finally {
+      reader.close()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to