This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 02f32ee358c [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
02f32ee358c is described below
commit 02f32ee358cc0a398aa7321bc5613cb92b306f6f
Author: wecharyu <[email protected]>
AuthorDate: Thu Dec 8 17:12:30 2022 +0900
[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
### What changes were proposed in this pull request?
Add the empty offset filter in `latestOffset()` for Kafka Source, so that
offset remains unchanged if Kafka provides no topic partition during fetch.
### Why are the changes needed?
KafkaOffsetReader may fetch empty partitions in some extreme cases like
getting partitions while Kafka cluster is reassigning partitions, this will
produce an empty `PartitionOffsetMap` (although there are topic-partitions
being unchanged) and stored in `committedOffsets` after `runBatch()`.
Then in the next batch, we fetch partitions normally and get the actual
offsets, but when fetching data of this batch in
`KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in
endOffsets will be considered as new partitions since the startOffsets is
empty, then these "new partitions" will fetch earliest offsets, which will
cause the data duplication.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add an unit test.
Closes #38898 from wecharyu/SPARK-41375.
Authored-by: wecharyu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 7 ++--
.../apache/spark/sql/kafka010/KafkaSource.scala | 4 +--
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 39 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 77bc658a1ef..a371d25899d 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream(
private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
- private var endPartitionOffsets: KafkaSourceOffset = _
-
private var latestPartitionOffsets: PartitionOffsetMap = _
private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
@@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream(
}
override def reportLatestOffset(): Offset = {
- KafkaSourceOffset(latestPartitionOffsets)
+
Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}
override def latestOffset(): Offset = {
@@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream(
}.getOrElse(latestPartitionOffsets)
}
- endPartitionOffsets = KafkaSourceOffset(offsets)
- endPartitionOffsets
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}
/** Checks if we need to skip this trigger based on minOffsetsPerTrigger &
maxTriggerDelay */
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index c82fda85eb4..b84643533f8 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -177,7 +177,7 @@ private[kafka010] class KafkaSource(
kafkaReader.fetchLatestOffsets(currentOffsets)
}
- latestPartitionOffsets = Some(latest)
+ latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)
val limits: Seq[ReadLimit] = limit match {
case rows: CompositeReadLimit => rows.getReadLimits
@@ -213,7 +213,7 @@ private[kafka010] class KafkaSource(
}
currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
- KafkaSourceOffset(offsets)
+
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}
/** Checks if we need to skip this trigger based on minOffsetsPerTrigger &
maxTriggerDelay */
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index db71f0fd918..e033f13ebf6 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -624,6 +624,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
)
}
+ test("SPARK-41375: empty partitions should not record to latest offset") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-good"
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.request.timeout.ms", "3000")
+ .option("kafka.default.api.timeout.ms", "3000")
+ .option("subscribePattern", s"$topicPrefix-.*")
+ .option("failOnDataLoss", "false")
+
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ Assert {
+ testUtils.deleteTopic(topic)
+ true
+ },
+ AssertOnQuery { q =>
+ val latestOffset: Option[(Long, OffsetSeq)] = q.offsetLog.getLatest
+ latestOffset.exists { offset =>
+ !offset._2.offsets.exists(_.exists(_.json == "{}"))
+ }
+ }
+ )
+ }
+
test("subscribe topic by pattern with topic recreation between batches") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-good"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]