This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c26d0bc726ea [SPARK-55701][SS] Fix race condition in
CompactibleFileStreamLog.allFiles
c26d0bc726ea is described below
commit c26d0bc726eaf04f1270cef36c340df098b486e2
Author: zeruibao <[email protected]>
AuthorDate: Fri Feb 27 14:25:30 2026 +0900
[SPARK-55701][SS] Fix race condition in CompactibleFileStreamLog.allFiles
### What changes were proposed in this pull request?
Changed the exception type thrown in `CompactibleFileStreamLog.allFiles()`
from `IllegalStateException` to `FileNotFoundException` when a batch metadata
file is missing (line 270). Since `FileNotFoundException` extends
`IOException`, the existing retry loop (line 277) now catches this case and
retries with an updated `latestId`.
### Why are the changes needed?
There is a race condition between a batch reader (e.g., `DESCRIBE TABLE`
via Thrift server) and a streaming writer performing compaction + cleanup
concurrently:
1. The reader calls `getLatestBatchId()` and observes `latestId = N`.
2. The writer completes a new compaction batch and `deleteExpiredLog`
removes old batch files.
3. The reader tries to read the now-deleted batch files based on the stale
`latestId`.
The `allFiles()` method already has a retry loop designed to handle this
exact scenario — it catches `IOException`, refreshes `latestId`, and retries.
However, the missing-file case was throwing `IllegalStateException`, which is
not a subclass of `IOException`, so it escaped the retry loop entirely and
surfaced as a fatal error to the user.
The fix changes the exception to `FileNotFoundException` so the existing
retry logic handles it correctly. The safety check on re-throw (lines 284-286)
ensures that if no newer compaction exists, the exception is still propagated
rather than silently swallowed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
### Was this patch authored or co-authored using generative AI tooling?
Get help with Claude 4.6 Opus but also review it carefully.
Closes #54500 from zeruibao/SPARK-55701-fix-race-condition.
Authored-by: zeruibao <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../runtime/CompactibleFileStreamLog.scala | 2 +-
.../streaming/CompactibleFileStreamLogSuite.scala | 49 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
index 8a90982b7c0c..7efe7b52c7ab 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/CompactibleFileStreamLog.scala
@@ -267,7 +267,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef :
ClassTag](
val logs =
getAllValidBatches(latestId, compactInterval).flatMap { id =>
filterInBatch(id)(shouldRetain(_, curTime)).getOrElse {
- throw new IllegalStateException(
+ throw new FileNotFoundException(
s"${batchIdToPath(id)} doesn't exist " +
s"(latestId: $latestId, compactInterval:
$compactInterval)")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 5a1608cb6165..b1b1976dff8b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -258,6 +258,55 @@ class CompactibleFileStreamLogSuite extends
SharedSparkSession {
})
}
+ test("allFiles retries when compaction deletes batch files during read") {
+ withTempDir { dir =>
+ // Override getLatestBatchId() so the first call returns a stale value
(4),
+ // simulating the reader listing the directory before compaction batch 5
is visible.
+ // The retry in allFiles() calls super.getLatestBatchId() (statically
bound to
+ // HDFSMetadataLog), which bypasses this override and reads the real
filesystem.
+ @volatile var staleLatestId: Option[Long] = None
+ val compactibleLog = new FakeCompactibleFileStreamLog(
+ FakeCompactibleFileStreamLog.VERSION,
+ _fileCleanupDelayMs = Long.MaxValue,
+ _defaultCompactInterval = 3,
+ _defaultMinBatchesToRetain = 1,
+ spark,
+ dir.getCanonicalPath) {
+ override def getLatestBatchId(): Option[Long] = {
+ staleLatestId match {
+ case some @ Some(_) =>
+ staleLatestId = None
+ some
+ case None =>
+ super.getLatestBatchId()
+ }
+ }
+ }
+
+ val fs =
compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+ for (batchId <- 0 to 5) {
+ compactibleLog.add(batchId, Array("path_" + batchId))
+ }
+
+ // Delete old files as if a concurrent writer's deleteExpiredLog removed
them
+ // after writing compaction batch 5.
+ fs.delete(compactibleLog.batchIdToPath(2), false)
+ fs.delete(compactibleLog.batchIdToPath(3), false)
+ fs.delete(compactibleLog.batchIdToPath(4), false)
+
+ // Inject stale latestId=4 right before calling allFiles().
+ // First iteration: getLatestBatchId() returns 4 (stale).
+ // getAllValidBatches(4, 3) = [2, 3, 4]. Batch 2 is deleted ->
FileNotFoundException.
+ // Retry: super.getLatestBatchId() reads filesystem -> returns 5.
+ // nextCompactionBatchId(4, 3) = 5, and 5 >= 5, so retry proceeds.
+ // Second iteration: getAllValidBatches(5, 3) = [5]. Reads 5.compact ->
success.
+ staleLatestId = Some(4L)
+ val result = compactibleLog.allFiles()
+ assert(result === (0 to 5).map("path_" + _))
+ }
+ }
+
private def withFakeCompactibleFileStreamLog(
fileCleanupDelayMs: Long,
defaultCompactInterval: Int,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]