This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new ae9e686b80d1 [SPARK-56720][SS] Fail subsequent async log writes after
a prior failure in async progress tracking
ae9e686b80d1 is described below
commit ae9e686b80d10ea3803f131f4ec87588703ee99e
Author: Yuchen Liu <[email protected]>
AuthorDate: Fri May 15 12:04:21 2026 -0700
[SPARK-56720][SS] Fail subsequent async log writes after a prior failure in
async progress tracking
### What changes were proposed in this pull request?
Pass the shared error recorder ErrorNotifier to both `AsyncOffsetSeqLog`
and `AsyncCommitLog`. Once the first failure is recorded:
- Subsequent offset and commit log write tasks short-circuit by failing
with the original error without touching durable storage.
- The first error is preserved via `compareAndSet(null, err)` so it is not
overwritten by later cascading failures.
The shared reference is owned by `AsyncProgressTrackingMicroBatchExecution`
and threaded through `AsyncStreamingQueryCheckpointMetadata` to both async
logs. The existing `.exceptionally` handlers in `markMicroBatchStart` /
`markMicroBatchEnd` are routed through a new `recordAsyncWriteError` helper so
the shared ref is also populated when the failure originates from `addAsync`'s
own `thenApply` (e.g. `concurrentStreamLogUpdate`).
### Why are the changes needed?
When async progress tracking is enabled, offset and commit log writes are
submitted to a single-threaded executor service. If one async write task fails,
follow-up writes already queued (or queued shortly afterward) can still proceed
and persist files to durable storage, leaving inconsistent state on disk — for
example a commit-log entry written without its corresponding offset-log entry.
The original error can also be overwritten in the `ErrorNotifier` by a
later cascading failure, so the user-visible exception masks the root cause
(e.g. surfacing `concurrentStreamLogUpdate` instead of the actual `Permission
denied` / IOException that started the cascade).
### Does this PR introduce _any_ user-facing change?
No. Async-progress-tracking queries that hit a write failure will now
surface the root cause instead of a later cascading error, but the failure mode
(query termination with a `StreamingQueryException`) is unchanged.
### How was this patch tested?
Added a regression test in `AsyncProgressTrackingMicroBatchExecutionSuite`
that triggers a real I/O failure on the first offset write and verifies:
1. The shared first-error reference is populated.
2. A follow-up commit-log write short-circuits with the original error and
produces no commit file.
3. A follow-up offset-log write for the next batch also short-circuits with
the same first error.
4. The shared error reference is not overwritten by later cascading
failures.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.7)
Closes #55676 from eason-yuchen-liu/ss-async-log-fail-fast.
Authored-by: Yuchen Liu
<[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit a87c133c2bef4d1041775e42c833f4a2a7a28870)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/checkpointing/AsyncCommitLog.scala | 15 +++++-
.../checkpointing/AsyncOffsetSeqLog.scala | 12 ++++-
.../AsyncProgressTrackingMicroBatchExecution.scala | 3 +-
.../AsyncStreamingQueryCheckpointMetadata.scala | 12 +++--
.../streaming/runtime/ErrorNotifier.scala | 16 ++++--
...cProgressTrackingMicroBatchExecutionSuite.scala | 57 ++++++++++++++++++++--
6 files changed, 101 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
index 116ea18326ef..7a6c26b249e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
@@ -25,11 +25,16 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.internal.LogKeys
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.streaming.runtime.ErrorNotifier
/**
* Implementation of CommitLog to perform asynchronous writes to storage
*/
-class AsyncCommitLog(sparkSession: SparkSession, path: String,
executorService: ThreadPoolExecutor)
+class AsyncCommitLog(
+ sparkSession: SparkSession,
+ path: String,
+ executorService: ThreadPoolExecutor,
+ errorNotifier: ErrorNotifier = new ErrorNotifier())
extends CommitLog(sparkSession, path) {
// the cache needs to be enabled because we may not be persisting every
entry to durable storage
@@ -109,6 +114,14 @@ class AsyncCommitLog(sparkSession: SparkSession, path:
String, executorService:
executorService.submit(new Runnable {
override def run(): Unit = {
try {
+ val priorError = errorNotifier.getError().orNull
+ if (priorError != null) {
+ logWarning(log"Skipping async commit write for batch " +
+ log"${MDC(LogKeys.BATCH_ID, batchId)} because a previous async
" +
+ log"write task already failed", priorError)
+ future.completeExceptionally(priorError)
+ return
+ }
if (fileManager.exists(batchMetadataFile)) {
future.complete(false)
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
index e6ba644ed483..eea10f3d5a93 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.internal.{LogKeys}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.streaming.runtime.ErrorNotifier
import org.apache.spark.util.{Clock, SystemClock}
/**
@@ -36,7 +37,8 @@ class AsyncOffsetSeqLog(
path: String,
executorService: ThreadPoolExecutor,
offsetCommitIntervalMs: Long,
- clock: Clock = new SystemClock())
+ clock: Clock = new SystemClock(),
+ errorNotifier: ErrorNotifier = new ErrorNotifier())
extends OffsetSeqLog(sparkSession, path) {
// the cache needs to be enabled because we may not be persisting every
entry to durable storage
@@ -143,6 +145,14 @@ class AsyncOffsetSeqLog(
executorService.submit(new Runnable {
override def run(): Unit = {
try {
+ val priorError = errorNotifier.getError().orNull
+ if (priorError != null) {
+ logWarning(log"Skipping async offset write for batch " +
+ log"${MDC(LogKeys.BATCH_ID, batchId)} because a previous async
" +
+ log"write task already failed", priorError)
+ future.completeExceptionally(priorError)
+ return
+ }
if (fileManager.exists(batchMetadataFile)) {
future.complete(false)
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
index 6bebad5ce54f..156f2b8b169a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
@@ -94,7 +94,8 @@ class AsyncProgressTrackingMicroBatchExecution(
resolvedCheckpointRoot,
asyncWritesExecutorService,
asyncProgressTrackingCheckpointingIntervalMs,
- triggerClock
+ triggerClock,
+ errorNotifier
)
override lazy val offsetLog: AsyncOffsetSeqLog =
asyncCheckpointMetadata.offsetLog
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
index 65113ce15c39..39622392aa62 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
@@ -30,13 +30,17 @@ import org.apache.spark.util.Clock
* @param asyncWritesExecutorService The executor service for async writes
* @param asyncProgressTrackingCheckpointingIntervalMs The interval for async
progress
* @param triggerClock The clock to use for trigger time
+ * @param errorNotifier Shared error sink between the offset and commit logs.
Once a
+ * first error is recorded, subsequent async write tasks
short-circuit
+ * with that error instead of writing to durable storage.
*/
class AsyncStreamingQueryCheckpointMetadata(
sparkSession: SparkSession,
resolvedCheckpointRoot: String,
asyncWritesExecutorService: ThreadPoolExecutor,
asyncProgressTrackingCheckpointingIntervalMs: Long,
- triggerClock: Clock)
+ triggerClock: Clock,
+ errorNotifier: ErrorNotifier)
extends StreamingQueryCheckpointMetadata(sparkSession,
resolvedCheckpointRoot) {
override lazy val offsetLog = new AsyncOffsetSeqLog(
@@ -44,13 +48,15 @@ class AsyncStreamingQueryCheckpointMetadata(
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
asyncWritesExecutorService,
asyncProgressTrackingCheckpointingIntervalMs,
- clock = triggerClock
+ clock = triggerClock,
+ errorNotifier = errorNotifier
)
override lazy val commitLog = new AsyncCommitLog(
sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
- asyncWritesExecutorService
+ asyncWritesExecutorService,
+ errorNotifier = errorNotifier
)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
index 19867c516837..2ba696173954 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
@@ -28,10 +28,20 @@ class ErrorNotifier extends Logging {
private val error = new AtomicReference[Throwable]
- /** To indicate any errors that have occurred */
+ /**
+ * Record a fatal error. Only the first error is retained - subsequent calls
+ * are no-ops so cascading failures cannot mask the original cause.
+ */
def markError(th: Throwable): Unit = {
- logError("A fatal error has occurred.", th)
- error.set(th)
+ if (error.compareAndSet(null, th)) {
+ logError("A fatal error has occurred.", th)
+ } else {
+ // Attach subsequent errors as suppressed so they're not silently lost.
+ val existing = error.get()
+ if (existing != null && existing != th) {
+ existing.addSuppressed(th)
+ }
+ }
}
/** Get any errors that have occurred */
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
index 3f0e65264662..2ea409e90672 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.io.File
-import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, Semaphore,
TimeUnit}
import scala.collection.mutable.ListBuffer
@@ -29,14 +29,14 @@ import org.scalatest.time.{Seconds, Span}
import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.connector.read.streaming
-import org.apache.spark.sql.execution.streaming.checkpointing.{AsyncCommitLog,
AsyncOffsetSeqLog}
-import
org.apache.spark.sql.execution.streaming.runtime.{AsyncProgressTrackingMicroBatchExecution,
MemoryStream, StreamExecution}
+import org.apache.spark.sql.execution.streaming.checkpointing.{AsyncCommitLog,
AsyncOffsetSeqLog, CommitMetadata, OffsetSeq}
+import
org.apache.spark.sql.execution.streaming.runtime.{AsyncProgressTrackingMicroBatchExecution,
ErrorNotifier, LongOffset, MemoryStream, StreamExecution}
import
org.apache.spark.sql.execution.streaming.runtime.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
ASYNC_PROGRESS_TRACKING_ENABLED,
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
import org.apache.spark.sql.functions.{column, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{StreamingQuery,
StreamingQueryException, StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
class AsyncProgressTrackingMicroBatchExecutionSuite
extends StreamTest with BeforeAndAfter with Matchers {
@@ -749,7 +749,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
val e = intercept[StreamingQueryException] {
q.processAllAvailable()
}
- e.getCause.getCause.getMessage should include("Permission
denied")
+ TestUtils.assertExceptionMsg(e, "Permission denied")
}
}
)
@@ -774,6 +774,53 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
testAsyncWriteErrorsPermissionsIssue("/commits")
}
+ test("async log writes record first error and gate subsequent writes") {
+ val checkpointLocation = Utils.createTempDir(namePrefix =
"streaming.metadata").getCanonicalPath
+ val offsetsDir = new File(checkpointLocation + "/offsets")
+ val commitsDir = checkpointLocation + "/commits"
+ offsetsDir.mkdirs()
+
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor("async-log-write-test")
+ try {
+ val sharedNotifier = new ErrorNotifier()
+ val offsetLog = new AsyncOffsetSeqLog(
+ spark, offsetsDir.getAbsolutePath, executor, offsetCommitIntervalMs =
0,
+ errorNotifier = sharedNotifier)
+ val commitLog = new AsyncCommitLog(
+ spark, commitsDir, executor, errorNotifier = sharedNotifier)
+
+ offsetsDir.setReadOnly()
+ try {
+ val firstFuture = offsetLog.addAsync(0L, OffsetSeq.fill(LongOffset(0)))
+ val firstEx = intercept[ExecutionException] {
+ firstFuture.get(5, TimeUnit.SECONDS)
+ }
+ // In production this is done by the `.exceptionally` chain in
+ // AsyncProgressTrackingMicroBatchExecution.
+ sharedNotifier.markError(firstEx.getCause)
+ val firstError = sharedNotifier.getError().get
+
+ val commitFuture = commitLog.addAsync(0L, CommitMetadata())
+ val commitEx = intercept[ExecutionException] {
+ commitFuture.get(5, TimeUnit.SECONDS)
+ }
+ assert(commitEx.getCause eq firstError)
+ assert(getListOfFiles(commitsDir).isEmpty)
+
+ val secondOffsetFuture = offsetLog.addAsync(1L,
OffsetSeq.fill(LongOffset(1)))
+ val secondOffsetEx = intercept[ExecutionException] {
+ secondOffsetFuture.get(5, TimeUnit.SECONDS)
+ }
+ assert(secondOffsetEx.getCause eq firstError)
+ assert(sharedNotifier.getError().contains(firstError))
+ } finally {
+ offsetsDir.setWritable(true)
+ }
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+
test("commit intervals happy path") {
val checkpointLocation = Utils.createTempDir(namePrefix =
"streaming.metadata").getCanonicalPath
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]