This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 34b004d012 fix(amber): surface writer-thread failure as FatalError 
instead of silent hang (#4683)
34b004d012 is described below

commit 34b004d012cc6a87fc2611a666aa1376d159c462
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 5 23:54:12 2026 -0700

    fix(amber): surface writer-thread failure as FatalError instead of silent 
hang (#4683)
    
    ### What changes were proposed in this PR?
    
    When `OutputPortResultWriterThread.run()` throws (e.g. iceberg
    commit-retry budget exhausted), the writer thread dies silently and the
    worker still reports `portCompleted` to the controller. The user sees a
    1-minute completion timeout with no signal pointing at iceberg.
    
    Capture the failure on the writer thread, re-throw it from
    `OutputManager.closeOutputStorageWriterIfNeeded`, and let the existing
    DP-thread → worker-actor → controller-supervisor path surface it as a
    `FatalError` to the client.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4682.
    
    ### How was this PR tested?
    
    `OutputPortResultWriterThreadSpec` (6 tests) covers clean run, putOne
    failure (close() still runs), close() failure, both-fail (close()
    suppressed on putOne), and
    `OutputManager.closeOutputStorageWriterIfNeeded` re-throw + no-op cases.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7, 1M context)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../messaginglayer/OutputManager.scala             |   6 +
 .../managers/OutputPortResultWriterThread.scala    |  38 +++++-
 .../OutputPortResultWriterThreadSpec.scala         | 151 +++++++++++++++++++++
 3 files changed, 188 insertions(+), 7 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 4ab3d18056..affbd786f9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -235,6 +235,11 @@ class OutputManager(
   /**
     * Singal the port storage writer to flush the remaining buffer and wait 
for commits to finish so that
     * the output port is properly completed. If the output port does not need 
storage, no action will be done.
+    *
+    * If the writer thread captured a failure (e.g., iceberg commit retries
+    * exhausted), re-throw it here so the DP thread surfaces a FatalError
+    * to the controller via pekko's supervisor strategy. Otherwise the worker
+    * would announce port completion as if the result was durably written.
     */
   def closeOutputStorageWriterIfNeeded(outputPortId: PortIdentity): Unit = {
     this.outputPortResultWriterThreads.get(outputPortId) match {
@@ -243,6 +248,7 @@ class OutputManager(
         writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
         // Blocking call
         writerThread.join()
+        writerThread.getFailure.foreach(throw _)
       case None =>
     }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
index 28e5d2af66..4223d920da 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
@@ -24,6 +24,7 @@ import 
org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple.Tuple
 
 import java.util.concurrent.LinkedBlockingQueue
+import scala.util.control.NonFatal
 
 sealed trait TerminateSignal
 case object PortStorageWriterTerminateSignal extends TerminateSignal
@@ -35,15 +36,38 @@ class OutputPortResultWriterThread(
   val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] =
     Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]()
 
+  // Captured failure from put-one or close() so the worker DP thread can
+  // re-throw and let the controller's pekko supervisor surface a FatalError
+  // to the client. Without this, the writer thread dies silently and the
+  // worker keeps reporting normal port completion to the controller while
+  // results are missing or stale, leading to e2e timeouts that hide the
+  // real cause.
+  @volatile private var failure: Option[Throwable] = None
+  def getFailure: Option[Throwable] = failure
+
   override def run(): Unit = {
-    var internalStop = false
-    while (!internalStop) {
-      val queueContent = queue.take()
-      queueContent match {
-        case Left(tuple) => bufferedItemWriter.putOne(tuple)
-        case Right(_)    => internalStop = true
+    try {
+      var internalStop = false
+      while (!internalStop) {
+        queue.take() match {
+          case Left(tuple) => bufferedItemWriter.putOne(tuple)
+          case Right(_)    => internalStop = true
+        }
+      }
+    } catch {
+      case NonFatal(e) => failure = Some(e)
+    } finally {
+      // close() runs even when the loop threw, so a putOne failure does
+      // not leak the underlying writer's file handles. If both legs fail,
+      // attach close()'s exception as suppressed on the original.
+      try bufferedItemWriter.close()
+      catch {
+        case NonFatal(e) =>
+          failure match {
+            case Some(orig) => orig.addSuppressed(e)
+            case None       => failure = Some(e)
+          }
       }
     }
-    bufferedItemWriter.close()
   }
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
new file mode 100644
index 0000000000..31d8c41611
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker.managers
+
+import org.apache.texera.amber.core.storage.model.BufferedItemWriter
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkOutputGateway,
+  OutputManager
+}
+import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable
+
+class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
+
+  private class StubWriter(
+      onPutOne: () => Unit = () => (),
+      onClose: () => Unit = () => ()
+  ) extends BufferedItemWriter[Tuple] {
+    val bufferSize: Int = 1024
+    var closeCalled = false
+    def open(): Unit = ()
+    def putOne(item: Tuple): Unit = onPutOne()
+    def removeOne(item: Tuple): Unit = ()
+    def close(): Unit = {
+      closeCalled = true
+      onClose()
+    }
+  }
+
+  private def throwing(msg: String): () => Unit = () => throw new 
RuntimeException(msg)
+
+  "OutputPortResultWriterThread" should "leave getFailure empty on a clean 
run" in {
+    val writer = new StubWriter()
+    val thread = new OutputPortResultWriterThread(writer)
+    thread.start()
+    thread.queue.put(Right(PortStorageWriterTerminateSignal))
+    thread.join()
+    assert(thread.getFailure.isEmpty)
+    assert(writer.closeCalled)
+  }
+
+  it should "capture a close() exception in getFailure so the worker can 
re-throw" in {
+    val writer = new StubWriter(onClose = throwing("test close failure"))
+    val thread = new OutputPortResultWriterThread(writer)
+    thread.start()
+    thread.queue.put(Right(PortStorageWriterTerminateSignal))
+    thread.join()
+    assert(thread.getFailure.exists(_.getMessage.contains("test close 
failure")))
+    assert(writer.closeCalled)
+  }
+
+  it should "capture a putOne exception and still call close()" in {
+    val writer = new StubWriter(onPutOne = throwing("test putOne failure"))
+    val thread = new OutputPortResultWriterThread(writer)
+    thread.start()
+    thread.queue.put(Left(null.asInstanceOf[Tuple]))
+    thread.queue.put(Right(PortStorageWriterTerminateSignal))
+    thread.join()
+    assert(thread.getFailure.exists(_.getMessage.contains("test putOne 
failure")))
+    // The finally clause must run close() even after putOne threw, or
+    // the underlying writer leaks file handles.
+    assert(writer.closeCalled)
+  }
+
+  it should "preserve both errors when putOne and close() fail in the same 
run" in {
+    val writer = new StubWriter(
+      onPutOne = throwing("test putOne failure"),
+      onClose = throwing("test close failure")
+    )
+    val thread = new OutputPortResultWriterThread(writer)
+    thread.start()
+    thread.queue.put(Left(null.asInstanceOf[Tuple]))
+    thread.queue.put(Right(PortStorageWriterTerminateSignal))
+    thread.join()
+    val captured = thread.getFailure.getOrElse(fail("expected putOne failure"))
+    assert(captured.getMessage.contains("test putOne failure"))
+    assert(
+      captured.getSuppressed.exists(_.getMessage.contains("test close 
failure")),
+      "close() failure should be attached as suppressed on the original putOne 
failure"
+    )
+  }
+
+  // Reach into OutputManager's private outputPortResultWriterThreads map to
+  // install a writer thread whose close() has already failed. This pins the
+  // contract that closeOutputStorageWriterIfNeeded re-throws the captured
+  // failure, which is the bridge from the writer thread to the DP thread →
+  // worker actor → controller supervisor → FatalError to client.
+  private def installWriterThread(
+      manager: OutputManager,
+      portId: PortIdentity,
+      thread: OutputPortResultWriterThread
+  ): Unit = {
+    val field = classOf[OutputManager]
+      .getDeclaredField("outputPortResultWriterThreads")
+    field.setAccessible(true)
+    field
+      .get(manager)
+      .asInstanceOf[mutable.HashMap[PortIdentity, 
OutputPortResultWriterThread]]
+      .put(portId, thread)
+  }
+
+  "OutputManager.closeOutputStorageWriterIfNeeded" should
+    "re-throw the writer thread's captured failure" in {
+    val identifier = ActorVirtualIdentity("test-worker")
+    val outputManager = new OutputManager(
+      identifier,
+      new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ())
+    )
+    val portId = PortIdentity()
+    val failingWriter = new StubWriter(onClose = throwing("test close 
failure"))
+    val failingThread = new OutputPortResultWriterThread(failingWriter)
+    failingThread.start()
+    installWriterThread(outputManager, portId, failingThread)
+    val ex = intercept[RuntimeException] {
+      outputManager.closeOutputStorageWriterIfNeeded(portId)
+    }
+    assert(ex.getMessage.contains("test close failure"))
+  }
+
+  it should "be a no-op when the port has no writer thread" in {
+    val identifier = ActorVirtualIdentity("test-worker")
+    val outputManager = new OutputManager(
+      identifier,
+      new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ())
+    )
+    // No installWriterThread call — the port has never had a writer.
+    outputManager.closeOutputStorageWriterIfNeeded(PortIdentity())
+  }
+}

Reply via email to