This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5781-0eb8427af94e8c28ad8f4cf673cfc5404e708c8e
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 1c580e59eb69bc45205298606c3980c67a05803f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 23 19:45:12 2026 -0700

    fix(execution-service): surface init-time fatal errors to the websocket 
(#5781)
    
    ### What changes were proposed in this PR?
    
    When workflow execution initialization fails, the error was recorded
    into the execution metadata store but never pushed to the websocket, so
    connected frontend clients saw nothing — particularly for failures
    during `WorkflowExecutionService` construction, which happens *before*
    the execution is published to subscribers.
    
    `WorkflowService.initExecutionService`'s catch arm now, after
    `errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent`
    (carrying the recorded fatal errors) to `errorSubject` — the
    workflow-level channel that `connect()` subscribers listen on — so
    init-time failures surface in the UI.
    
    | init failure | before | after |
    |---|---|---|
    | during `WorkflowExecutionService` construction (pre-publish) | logged
    + stored, invisible to the UI | `WorkflowErrorEvent` delivered to the
    frontend |
    | during `executeWorkflow()` | recorded; UI delivery depended on
    subscription timing | `WorkflowErrorEvent` delivered to the frontend |
    
    The push is extracted into a small `reportFatalErrorsToSubscribers`
    method so it can be unit-tested without a database (the init path itself
    is DB-bound).
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5782. Discovered while splitting #5700 (loop operators) into
    smaller PRs; this fix is independent of that feature and applies to
    `main` on its own.
    
    ### How was this PR tested?
    
    New `WorkflowServiceSpec` (TDD, red → green): pins that
    `reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a
    `connect()` subscriber carrying exactly the fatal errors recorded in the
    execution state store (single error, and all errors when several are
    present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"`
    passes (2/2); scalafmt + scalafix clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../texera/web/service/WorkflowService.scala       | 35 ++++++++-
 .../texera/web/service/WorkflowServiceSpec.scala   | 85 ++++++++++++++++++++++
 2 files changed, 118 insertions(+), 2 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index a241121da2..90934287eb 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
 }
 import org.apache.texera.dao.jooq.generated.tables.pojos.User
 import org.apache.texera.service.util.LargeBinaryManager
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
+import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, 
WorkflowErrorEvent}
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId
@@ -277,6 +277,14 @@ class WorkflowService(
         }
       }
     }
+    // Once the execution is published via `executionService.onNext`, the 
normal
+    // state-store path surfaces fatal errors to the UI: `errorHandler` writes
+    // them into `executionStateStore.metadataStore`, whose diff handler (set 
up
+    // in the WorkflowExecutionService constructor) emits a WorkflowErrorEvent
+    // that `connectToExecution` forwards. Before that point, neither the 
emitter
+    // nor a subscriber exists yet, so a failure in the constructor itself 
would
+    // be recorded but never reach the frontend -- see the fallback in `catch`.
+    var executionPublished = false
     try {
       val execution = new WorkflowExecutionService(
         controllerConf,
@@ -290,13 +298,36 @@ class WorkflowService(
       )
       lifeCycleManager.registerCleanUpOnStateChange(executionStateStore)
       executionService.onNext(execution)
+      executionPublished = true
       execution.executeWorkflow()
     } catch {
-      case e: Throwable => errorHandler(e)
+      case e: Throwable =>
+        errorHandler(e)
+        // If the execution was never published, no `connectToExecution`
+        // subscriber is bound to `executionStateStore`, so the state-store 
path
+        // above cannot deliver the error. Push it directly in that pre-publish
+        // window only; once published, the state-store path already surfaces 
it
+        // (pushing here too would double-emit).
+        if (!executionPublished) {
+          reportFatalErrorsToSubscribers(executionStateStore)
+        }
     }
 
   }
 
+  /**
+    * Push the fatal errors currently recorded in `stateStore` to connected
+    * websocket subscribers (via `errorSubject`).
+    *
+    * Fallback used only when execution initialization fails before the 
execution
+    * is published (e.g. the WorkflowExecutionService constructor throws): in 
that
+    * window the per-execution state store has no diff-handler emitter and no
+    * websocket subscriber, so the error -- already recorded by `errorHandler` 
--
+    * would otherwise be logged but never reach the frontend.
+    */
+  private[service] def reportFatalErrorsToSubscribers(stateStore: 
ExecutionStateStore): Unit =
+    
errorSubject.onNext(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors))
+
   def convertToJson(frontendVersion: String): String = {
     val environmentVersionMap = Map(
       "engine_version" -> Json.toJson(frontendVersion)
diff --git 
a/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala 
b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala
new file mode 100644
index 0000000000..7c1d879c93
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import com.google.protobuf.timestamp.Timestamp
+import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity
+import 
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
+import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError
+import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, 
WorkflowErrorEvent}
+import org.apache.texera.web.storage.ExecutionStateStore
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.time.Instant
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Unit tests for `WorkflowService.reportFatalErrorsToSubscribers`, the seam
+  * that surfaces init-time fatal errors to the websocket. When execution
+  * initialization fails, the error is recorded in the metadata store; this 
push
+  * is what makes it visible to connected clients instead of only logged.
+  */
+class WorkflowServiceSpec extends AnyFlatSpec with Matchers {
+
+  private def fatalError(message: String): WorkflowFatalError =
+    WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), message, "", 
"", "")
+
+  /** A WorkflowService with a subscriber collecting every event it pushes. */
+  private def serviceWithCollector(): (WorkflowService, 
ArrayBuffer[TexeraWebSocketEvent]) = {
+    val service = new WorkflowService(WorkflowIdentity(1), computingUnitId = 
1, cleanUpTimeout = 30)
+    val events = ArrayBuffer.empty[TexeraWebSocketEvent]
+    service.connect(evt => events += evt)
+    (service, events)
+  }
+
+  private def errorEventsIn(events: ArrayBuffer[TexeraWebSocketEvent]): 
Seq[WorkflowErrorEvent] =
+    events.collect { case e: WorkflowErrorEvent => e }.toSeq
+
+  "WorkflowService" should
+    "push a WorkflowErrorEvent carrying the store's fatal error to connected 
subscribers" in {
+    val (service, events) = serviceWithCollector()
+    val store = new ExecutionStateStore()
+    val err = fatalError("boom during init")
+    store.metadataStore.updateState(_.addFatalErrors(err))
+
+    service.reportFatalErrorsToSubscribers(store)
+
+    val errorEvents = errorEventsIn(events)
+    errorEvents should have size 1
+    // Forwards exactly the store's fatal errors -- no more, no less.
+    errorEvents.head.fatalErrors should contain theSameElementsAs Seq(err)
+  }
+
+  it should "carry every fatal error currently recorded in the store" in {
+    val (service, events) = serviceWithCollector()
+    val store = new ExecutionStateStore()
+    val first = fatalError("first")
+    val second = fatalError("second")
+    
store.metadataStore.updateState(_.addFatalErrors(first).addFatalErrors(second))
+
+    service.reportFatalErrorsToSubscribers(store)
+
+    val errorEvents = errorEventsIn(events)
+    errorEvents should have size 1
+    // Exactly the two recorded errors -- no extras.
+    errorEvents.head.fatalErrors should contain theSameElementsAs Seq(first, 
second)
+  }
+}

Reply via email to