This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a87c133c2bef [SPARK-56720][SS] Fail subsequent async log writes after 
a prior failure in async progress tracking
a87c133c2bef is described below

commit a87c133c2bef4d1041775e42c833f4a2a7a28870
Author: Yuchen Liu <[email protected]>
AuthorDate: Fri May 15 12:04:21 2026 -0700

    [SPARK-56720][SS] Fail subsequent async log writes after a prior failure in 
async progress tracking
    
    ### What changes were proposed in this pull request?
    
    Pass the shared error recorder ErrorNotifier to both `AsyncOffsetSeqLog` 
and `AsyncCommitLog`. Once the first failure is recorded:
    
    - Subsequent offset and commit log write tasks short-circuit by failing 
with the original error without touching durable storage.
    - The first error is preserved via `compareAndSet(null, err)` so it is not 
overwritten by later cascading failures.
    
    The shared reference is owned by `AsyncProgressTrackingMicroBatchExecution` 
and threaded through `AsyncStreamingQueryCheckpointMetadata` to both async 
logs. The existing `.exceptionally` handlers in `markMicroBatchStart` / 
`markMicroBatchEnd` are routed through a new `recordAsyncWriteError` helper so 
the shared ref is also populated when the failure originates from `addAsync`'s 
own `thenApply` (e.g. `concurrentStreamLogUpdate`).
    
    ### Why are the changes needed?
    
    When async progress tracking is enabled, offset and commit log writes are 
submitted to a single-threaded executor service. If one async write task fails, 
follow-up writes already queued (or queued shortly afterward) can still proceed 
and persist files to durable storage, leaving inconsistent state on disk — for 
example a commit-log entry written without its corresponding offset-log entry.
    
    The original error can also be overwritten in the `ErrorNotifier` by a 
later cascading failure, so the user-visible exception masks the root cause 
(e.g. surfacing `concurrentStreamLogUpdate` instead of the actual `Permission 
denied` / IOException that started the cascade).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Async-progress-tracking queries that hit a write failure will now 
surface the root cause instead of a later cascading error, but the failure mode 
(query termination with a `StreamingQueryException`) is unchanged.
    
    ### How was this patch tested?
    
    Added a regression test in `AsyncProgressTrackingMicroBatchExecutionSuite` 
that triggers a real I/O failure on the first offset write and verifies:
    
    1. The shared first-error reference is populated.
    2. A follow-up commit-log write short-circuits with the original error and 
produces no commit file.
    3. A follow-up offset-log write for the next batch also short-circuits with 
the same first error.
    4. The shared error reference is not overwritten by later cascading 
failures.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor (Claude Opus 4.7)
    
    Closes #55676 from eason-yuchen-liu/ss-async-log-fail-fast.
    
    Authored-by: Yuchen Liu 
<[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../streaming/checkpointing/AsyncCommitLog.scala   | 15 +++++-
 .../checkpointing/AsyncOffsetSeqLog.scala          | 12 ++++-
 .../AsyncProgressTrackingMicroBatchExecution.scala |  3 +-
 .../AsyncStreamingQueryCheckpointMetadata.scala    | 12 +++--
 .../streaming/runtime/ErrorNotifier.scala          | 16 ++++--
 ...cProgressTrackingMicroBatchExecutionSuite.scala | 57 ++++++++++++++++++++--
 6 files changed, 101 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
index 116ea18326ef..7a6c26b249e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala
@@ -25,11 +25,16 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.internal.LogKeys
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.streaming.runtime.ErrorNotifier
 
 /**
  * Implementation of CommitLog to perform asynchronous writes to storage
  */
-class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+class AsyncCommitLog(
+    sparkSession: SparkSession,
+    path: String,
+    executorService: ThreadPoolExecutor,
+    errorNotifier: ErrorNotifier = new ErrorNotifier())
   extends CommitLog(sparkSession, path) {
 
   // the cache needs to be enabled because we may not be persisting every 
entry to durable storage
@@ -109,6 +114,14 @@ class AsyncCommitLog(sparkSession: SparkSession, path: 
String, executorService:
       executorService.submit(new Runnable {
         override def run(): Unit = {
           try {
+            val priorError = errorNotifier.getError().orNull
+            if (priorError != null) {
+              logWarning(log"Skipping async commit write for batch " +
+                log"${MDC(LogKeys.BATCH_ID, batchId)} because a previous async 
" +
+                log"write task already failed", priorError)
+              future.completeExceptionally(priorError)
+              return
+            }
             if (fileManager.exists(batchMetadataFile)) {
               future.complete(false)
             } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
index e6ba644ed483..eea10f3d5a93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncOffsetSeqLog.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.internal.{LogKeys}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.streaming.runtime.ErrorNotifier
 import org.apache.spark.util.{Clock, SystemClock}
 
 /**
@@ -36,7 +37,8 @@ class AsyncOffsetSeqLog(
     path: String,
     executorService: ThreadPoolExecutor,
     offsetCommitIntervalMs: Long,
-    clock: Clock = new SystemClock())
+    clock: Clock = new SystemClock(),
+    errorNotifier: ErrorNotifier = new ErrorNotifier())
   extends OffsetSeqLog(sparkSession, path) {
 
   // the cache needs to be enabled because we may not be persisting every 
entry to durable storage
@@ -143,6 +145,14 @@ class AsyncOffsetSeqLog(
       executorService.submit(new Runnable {
         override def run(): Unit = {
           try {
+            val priorError = errorNotifier.getError().orNull
+            if (priorError != null) {
+              logWarning(log"Skipping async offset write for batch " +
+                log"${MDC(LogKeys.BATCH_ID, batchId)} because a previous async 
" +
+                log"write task already failed", priorError)
+              future.completeExceptionally(priorError)
+              return
+            }
             if (fileManager.exists(batchMetadataFile)) {
               future.complete(false)
             } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
index 6bebad5ce54f..156f2b8b169a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala
@@ -94,7 +94,8 @@ class AsyncProgressTrackingMicroBatchExecution(
       resolvedCheckpointRoot,
       asyncWritesExecutorService,
       asyncProgressTrackingCheckpointingIntervalMs,
-      triggerClock
+      triggerClock,
+      errorNotifier
     )
 
   override lazy val offsetLog: AsyncOffsetSeqLog = 
asyncCheckpointMetadata.offsetLog
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
index 65113ce15c39..39622392aa62 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncStreamingQueryCheckpointMetadata.scala
@@ -30,13 +30,17 @@ import org.apache.spark.util.Clock
  * @param asyncWritesExecutorService The executor service for async writes
  * @param asyncProgressTrackingCheckpointingIntervalMs The interval for async 
progress
  * @param triggerClock The clock to use for trigger time
+ * @param errorNotifier Shared error sink between the offset and commit logs. 
Once a
+ *                      first error is recorded, subsequent async write tasks 
short-circuit
+ *                      with that error instead of writing to durable storage.
  */
 class AsyncStreamingQueryCheckpointMetadata(
     sparkSession: SparkSession,
     resolvedCheckpointRoot: String,
     asyncWritesExecutorService: ThreadPoolExecutor,
     asyncProgressTrackingCheckpointingIntervalMs: Long,
-    triggerClock: Clock)
+    triggerClock: Clock,
+    errorNotifier: ErrorNotifier)
   extends StreamingQueryCheckpointMetadata(sparkSession, 
resolvedCheckpointRoot) {
 
   override lazy val offsetLog = new AsyncOffsetSeqLog(
@@ -44,13 +48,15 @@ class AsyncStreamingQueryCheckpointMetadata(
     checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
     asyncWritesExecutorService,
     asyncProgressTrackingCheckpointingIntervalMs,
-    clock = triggerClock
+    clock = triggerClock,
+    errorNotifier = errorNotifier
   )
 
   override lazy val commitLog = new AsyncCommitLog(
     sparkSession,
     checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
-    asyncWritesExecutorService
+    asyncWritesExecutorService,
+    errorNotifier = errorNotifier
   )
 
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
index 19867c516837..2ba696173954 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ErrorNotifier.scala
@@ -28,10 +28,20 @@ class ErrorNotifier extends Logging {
 
   private val error = new AtomicReference[Throwable]
 
-  /** To indicate any errors that have occurred */
+  /**
+   * Record a fatal error. Only the first error is retained - subsequent calls
+   * are no-ops so cascading failures cannot mask the original cause.
+   */
   def markError(th: Throwable): Unit = {
-    logError("A fatal error has occurred.", th)
-    error.set(th)
+    if (error.compareAndSet(null, th)) {
+      logError("A fatal error has occurred.", th)
+    } else {
+      // Attach subsequent errors as suppressed so they're not silently lost.
+      val existing = error.get()
+      if (existing != null && existing != th) {
+        existing.addSuppressed(th)
+      }
+    }
   }
 
   /** Get any errors that have occurred */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
index 3f0e65264662..2ea409e90672 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.io.File
-import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, Semaphore, 
TimeUnit}
 
 import scala.collection.mutable.ListBuffer
 
@@ -29,14 +29,14 @@ import org.scalatest.time.{Seconds, Span}
 import org.apache.spark.TestUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.connector.read.streaming
-import org.apache.spark.sql.execution.streaming.checkpointing.{AsyncCommitLog, 
AsyncOffsetSeqLog}
-import 
org.apache.spark.sql.execution.streaming.runtime.{AsyncProgressTrackingMicroBatchExecution,
 MemoryStream, StreamExecution}
+import org.apache.spark.sql.execution.streaming.checkpointing.{AsyncCommitLog, 
AsyncOffsetSeqLog, CommitMetadata, OffsetSeq}
+import 
org.apache.spark.sql.execution.streaming.runtime.{AsyncProgressTrackingMicroBatchExecution,
 ErrorNotifier, LongOffset, MemoryStream, StreamExecution}
 import 
org.apache.spark.sql.execution.streaming.runtime.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
 import org.apache.spark.sql.functions.{column, window}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 class AsyncProgressTrackingMicroBatchExecutionSuite
   extends StreamTest with BeforeAndAfter with Matchers {
@@ -749,7 +749,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
               val e = intercept[StreamingQueryException] {
                 q.processAllAvailable()
               }
-              e.getCause.getCause.getMessage should include("Permission 
denied")
+              TestUtils.assertExceptionMsg(e, "Permission denied")
             }
         }
       )
@@ -774,6 +774,53 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
     testAsyncWriteErrorsPermissionsIssue("/commits")
   }
 
+  test("async log writes record first error and gate subsequent writes") {
+    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+    val offsetsDir = new File(checkpointLocation + "/offsets")
+    val commitsDir = checkpointLocation + "/commits"
+    offsetsDir.mkdirs()
+
+    val executor = 
ThreadUtils.newDaemonSingleThreadExecutor("async-log-write-test")
+    try {
+      val sharedNotifier = new ErrorNotifier()
+      val offsetLog = new AsyncOffsetSeqLog(
+        spark, offsetsDir.getAbsolutePath, executor, offsetCommitIntervalMs = 
0,
+        errorNotifier = sharedNotifier)
+      val commitLog = new AsyncCommitLog(
+        spark, commitsDir, executor, errorNotifier = sharedNotifier)
+
+      offsetsDir.setReadOnly()
+      try {
+        val firstFuture = offsetLog.addAsync(0L, OffsetSeq.fill(LongOffset(0)))
+        val firstEx = intercept[ExecutionException] {
+          firstFuture.get(5, TimeUnit.SECONDS)
+        }
+        // In production this is done by the `.exceptionally` chain in
+        // AsyncProgressTrackingMicroBatchExecution.
+        sharedNotifier.markError(firstEx.getCause)
+        val firstError = sharedNotifier.getError().get
+
+        val commitFuture = commitLog.addAsync(0L, CommitMetadata())
+        val commitEx = intercept[ExecutionException] {
+          commitFuture.get(5, TimeUnit.SECONDS)
+        }
+        assert(commitEx.getCause eq firstError)
+        assert(getListOfFiles(commitsDir).isEmpty)
+
+        val secondOffsetFuture = offsetLog.addAsync(1L, 
OffsetSeq.fill(LongOffset(1)))
+        val secondOffsetEx = intercept[ExecutionException] {
+          secondOffsetFuture.get(5, TimeUnit.SECONDS)
+        }
+        assert(secondOffsetEx.getCause eq firstError)
+        assert(sharedNotifier.getError().contains(firstError))
+      } finally {
+        offsetsDir.setWritable(true)
+      }
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
   test("commit intervals happy path") {
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to