This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 539cbe8fa80f [SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest()
to avoid redundant ListStatus during MicroBatchExecution init
539cbe8fa80f is described below
commit 539cbe8fa80fde99feaf2c92fab2ac34719ab03f
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue May 26 10:37:33 2026 +0800
[SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() to avoid redundant
ListStatus during MicroBatchExecution init
### What changes were proposed in this pull request?
Followup to https://github.com/apache/spark/pull/54373.
SPARK-55601 added a new `offsetLog.getLatest()` call inside `logicalPlan`'s
computation to derive `enforceNamed` from the last written offset log entry.
`initializeExecution` already calls `offsetLog.getLatest()` on its first line.
Both calls happen on the query thread during stream startup, with no offset log
writes in between, so the two reads always return the same value. The second
one is wasted work: each `getLatest()` triggers `listBatches` →
`HDFSMetadataLog.list` → a filesyste [...]
This PR caches the first read in a `private lazy val
initialLatestOffsetSeq` on `MicroBatchExecution` and routes both call sites
through it:
- `enforceNamed` derivation in `logicalPlan` lazy val.
- `var latestStartedBatch` initialization in `initializeExecution`.
Subsequent reads inside `initializeExecution` (after a `purgeAfter`) and in
`populateStartOffsets` are unchanged — those legitimately need fresh
`getLatest()` results.
### Why are the changes needed?
Avoids one redundant `ListStatus` on `<checkpoint>/offsets/` per stream
startup. The cost is small but unnecessary, and downstream consumers that track
per-checkpoint filesystem operations (for tracing, auditing, or test
invariants) currently see one extra op against the offsets directory because of
this duplication.
### Does this PR introduce _any_ user-facing change?
No. Same behavior, fewer filesystem calls.
### How was this patch tested?
Existing `MicroBatchExecutionSuite` and downstream streaming-startup tests
cover both call sites. The change is a pure caching refactor; the cached value
is identical to what a second `getLatest()` would return because nothing else
writes the offset log between construction and `initializeExecution` on the
query thread.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #56054 from cloud-fan/SPARK-55601-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit bcc29db2540682e42b94fb0b393cee5a45d1d8f0)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/streaming/runtime/MicroBatchExecution.scala | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index b499e676a84a..726586ac72e6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -183,6 +183,16 @@ class MicroBatchExecution(
// into every subsequent batch's query plan.
private val stateSchemaMetadatas = MutableMap[Long, StateSchemaBroadcast]()
+ /**
+ * Cached result of the first `offsetLog.getLatest()` call. Reused by both
+ * `logicalPlan` (to determine `enforceNamed`) and `initializeExecution` (to
seed
+ * `latestStartedBatch`). This avoids a redundant `ListStatus` on the
checkpoint's
+ * `offsets/` directory during stream startup. Safe to cache: between
construction
+ * and `initializeExecution`, nothing else writes the offset log on the
query thread.
+ */
+ private lazy val initialLatestOffsetSeq: Option[(Long, OffsetSeqBase)] =
+ offsetLog.getLatest()
+
override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
@@ -204,7 +214,7 @@ class MicroBatchExecution(
// Read the source evolution enforcement from the last written offset log
entry. If no entries
// are found, use the session config value.
- val enforceNamed = offsetLog.getLatest().flatMap { case (_, offsetSeq) =>
+ val enforceNamed = initialLatestOffsetSeq.flatMap { case (_, offsetSeq) =>
offsetSeq.metadataOpt.flatMap { metadata =>
OffsetSeqMetadata.readValueOpt(metadata,
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
.map(_.toBoolean)
@@ -451,7 +461,7 @@ class MicroBatchExecution(
private def initializeExecution(
sparkSessionForStream: SparkSession): MicroBatchExecutionContext = {
- var latestStartedBatch = offsetLog.getLatest()
+ var latestStartedBatch = initialLatestOffsetSeq
val latestCommittedBatch = commitLog.getLatest()
val lastCommittedBatchId = latestCommittedBatch match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]