This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4f6329422639 [SPARK-55561][SS] Add retries for all Kafka admin client
methods
4f6329422639 is described below
commit 4f6329422639558c37d3b2ea56f35f3a98b1ea4c
Author: Kavpreet Grewal <[email protected]>
AuthorDate: Thu Feb 19 16:36:17 2026 -0800
[SPARK-55561][SS] Add retries for all Kafka admin client methods
### What changes were proposed in this pull request?
- Wraps the usage of the Kafka Admin client in
`KafkaOffsetReaderAdmin.fetchPartitionOffsets` with `withRetries`
- Updates `withRetries` to work with generic types rather than just
`Map[TopicPartition, Long]`
### Why are the changes needed?
In `KafkaOffsetReaderAdmin.fetchPartitionOffsets`, method calls using the
admin client are not retried, unlike other calls with are wrapped with
`withRetries`. These calls should also be retried so that we handle transient
errors gracefully and do not immediately fail the stream.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new unit tests and existing tests should continue to pass.
Ran tests locally: `build/sbt "sql-kafka-0-10/testOnly
org.apache.spark.sql.kafka010.KafkaOffsetReaderSuite"`
### Was this patch authored or co-authored using generative AI tooling?
Closes #54339 from kavpreetgrewal/SPARK-55561-retries.
Authored-by: Kavpreet Grewal <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../sql/kafka010/KafkaOffsetReaderAdmin.scala | 6 +--
.../sql/kafka010/KafkaOffsetReaderSuite.scala | 61 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 3 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index ee674f34d9cb..69f346af19a8 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -127,7 +127,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
logDebug(s"Assigned partitions: $partitions. Seeking to
$partitionOffsets")
partitionOffsets
}
- val partitions = consumerStrategy.assignedTopicPartitions(admin)
+ val partitions = withRetries {
consumerStrategy.assignedTopicPartitions(admin) }
// Obtain TopicPartition offsets with late binding support
offsetRangeLimit match {
case EarliestOffsetRangeLimit => partitions.map {
@@ -455,9 +455,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
* Retries are needed to handle transient failures. For e.g. race conditions
between getting
* assignment and getting position while topics/partitions are deleted can
cause NPEs.
*/
- private def withRetries(body: => Map[TopicPartition, Long]):
Map[TopicPartition, Long] = {
+ private def withRetries[T](body: => T): T = {
synchronized {
- var result: Option[Map[TopicPartition, Long]] = None
+ var result: Option[T] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index f50617996428..1674e5a312f0 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -22,8 +22,11 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, times, verify, when}
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
@@ -263,4 +266,62 @@ class KafkaOffsetReaderSuite extends QueryTest with
SharedSparkSession with Kafk
}
}
}
+
+ private def createReaderWithMockedStrategy(
+ mockStrategy: ConsumerStrategy): KafkaOffsetReaderAdmin = {
+ new KafkaOffsetReaderAdmin(
+ mockStrategy,
+ KafkaSourceProvider.kafkaParamsForDriver(Map(
+ "bootstrap.servers" -> testUtils.brokerAddress
+ )),
+ CaseInsensitiveMap(Map(
+ KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY -> "3",
+ KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS -> "0"
+ )),
+ ""
+ )
+ }
+
+ test("SPARK-55561: fetchPartitionOffsets retries on transient failures") {
+ val tp0 = new TopicPartition("topic", 0)
+ val tp1 = new TopicPartition("topic", 1)
+ val expectedPartitions = Set(tp0, tp1)
+
+ val mockStrategy = mock(classOf[ConsumerStrategy])
+ val mockAdmin = mock(classOf[Admin])
+ when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
+ when(mockStrategy.assignedTopicPartitions(any()))
+ .thenThrow(new RuntimeException("Transient error"))
+ .thenThrow(new RuntimeException("Transient error"))
+ .thenReturn(expectedPartitions)
+
+ val reader = createReaderWithMockedStrategy(mockStrategy)
+ try {
+ val result = reader.fetchPartitionOffsets(
+ EarliestOffsetRangeLimit, isStartingOffsets = true)
+ assert(result === expectedPartitions.map(tp => tp ->
KafkaOffsetRangeLimit.EARLIEST).toMap)
+ verify(mockStrategy, times(3)).assignedTopicPartitions(any())
+ } finally {
+ reader.close()
+ }
+ }
+
+ test("SPARK-55561: fetchPartitionOffsets throws after all retries
exhausted") {
+ val mockStrategy = mock(classOf[ConsumerStrategy])
+ val mockAdmin = mock(classOf[Admin])
+ when(mockStrategy.createAdmin(any())).thenReturn(mockAdmin)
+ when(mockStrategy.assignedTopicPartitions(any()))
+ .thenThrow(new RuntimeException("Persistent error"))
+
+ val reader = createReaderWithMockedStrategy(mockStrategy)
+ try {
+ val ex = intercept[RuntimeException] {
+ reader.fetchPartitionOffsets(EarliestOffsetRangeLimit,
isStartingOffsets = true)
+ }
+ assert(ex.getMessage === "Persistent error")
+ verify(mockStrategy, times(3)).assignedTopicPartitions(any())
+ } finally {
+ reader.close()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]