This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5781-0eb8427af94e8c28ad8f4cf673cfc5404e708c8e in repository https://gitbox.apache.org/repos/asf/texera.git
commit 1c580e59eb69bc45205298606c3980c67a05803f Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Jun 23 19:45:12 2026 -0700 fix(execution-service): surface init-time fatal errors to the websocket (#5781) ### What changes were proposed in this PR? When workflow execution initialization fails, the error was recorded into the execution metadata store but never pushed to the websocket, so connected frontend clients saw nothing — particularly for failures during `WorkflowExecutionService` construction, which happens *before* the execution is published to subscribers. `WorkflowService.initExecutionService`'s catch arm now, after `errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent` (carrying the recorded fatal errors) to `errorSubject` — the workflow-level channel that `connect()` subscribers listen on — so init-time failures surface in the UI. | init failure | before | after | |---|---|---| | during `WorkflowExecutionService` construction (pre-publish) | logged + stored, invisible to the UI | `WorkflowErrorEvent` delivered to the frontend | | during `executeWorkflow()` | recorded; UI delivery depended on subscription timing | `WorkflowErrorEvent` delivered to the frontend | The push is extracted into a small `reportFatalErrorsToSubscribers` method so it can be unit-tested without a database (the init path itself is DB-bound). ### Any related issues, documentation, discussions? Resolves #5782. Discovered while splitting #5700 (loop operators) into smaller PRs; this fix is independent of that feature and applies to `main` on its own. ### How was this PR tested? New `WorkflowServiceSpec` (TDD, red → green): pins that `reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a `connect()` subscriber carrying exactly the fatal errors recorded in the execution state store (single error, and all errors when several are present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"` passes (2/2); scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../texera/web/service/WorkflowService.scala | 35 ++++++++- .../texera/web/service/WorkflowServiceSpec.scala | 85 ++++++++++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index a241121da2..90934287eb 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{ } import org.apache.texera.dao.jooq.generated.tables.pojos.User import org.apache.texera.service.util.LargeBinaryManager -import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent +import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent} import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId @@ -277,6 +277,14 @@ class WorkflowService( } } } + // Once the execution is published via `executionService.onNext`, the normal + // state-store path surfaces fatal errors to the UI: `errorHandler` writes + // them into `executionStateStore.metadataStore`, whose diff handler (set up + // in the WorkflowExecutionService constructor) emits a WorkflowErrorEvent + // that `connectToExecution` forwards. Before that point, neither the emitter + // nor a subscriber exists yet, so a failure in the constructor itself would + // be recorded but never reach the frontend -- see the fallback in `catch`. + var executionPublished = false try { val execution = new WorkflowExecutionService( controllerConf, @@ -290,13 +298,36 @@ class WorkflowService( ) lifeCycleManager.registerCleanUpOnStateChange(executionStateStore) executionService.onNext(execution) + executionPublished = true execution.executeWorkflow() } catch { - case e: Throwable => errorHandler(e) + case e: Throwable => + errorHandler(e) + // If the execution was never published, no `connectToExecution` + // subscriber is bound to `executionStateStore`, so the state-store path + // above cannot deliver the error. Push it directly in that pre-publish + // window only; once published, the state-store path already surfaces it + // (pushing here too would double-emit). + if (!executionPublished) { + reportFatalErrorsToSubscribers(executionStateStore) + } } } + /** + * Push the fatal errors currently recorded in `stateStore` to connected + * websocket subscribers (via `errorSubject`). + * + * Fallback used only when execution initialization fails before the execution + * is published (e.g. the WorkflowExecutionService constructor throws): in that + * window the per-execution state store has no diff-handler emitter and no + * websocket subscriber, so the error -- already recorded by `errorHandler` -- + * would otherwise be logged but never reach the frontend. + */ + private[service] def reportFatalErrorsToSubscribers(stateStore: ExecutionStateStore): Unit = + errorSubject.onNext(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors)) + def convertToJson(frontendVersion: String): String = { val environmentVersionMap = Map( "engine_version" -> Json.toJson(frontendVersion) diff --git a/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala new file mode 100644 index 0000000000..7c1d879c93 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import com.google.protobuf.timestamp.Timestamp +import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE +import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError +import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent} +import org.apache.texera.web.storage.ExecutionStateStore +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.time.Instant +import scala.collection.mutable.ArrayBuffer + +/** + * Unit tests for `WorkflowService.reportFatalErrorsToSubscribers`, the seam + * that surfaces init-time fatal errors to the websocket. When execution + * initialization fails, the error is recorded in the metadata store; this push + * is what makes it visible to connected clients instead of only logged. + */ +class WorkflowServiceSpec extends AnyFlatSpec with Matchers { + + private def fatalError(message: String): WorkflowFatalError = + WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), message, "", "", "") + + /** A WorkflowService with a subscriber collecting every event it pushes. */ + private def serviceWithCollector(): (WorkflowService, ArrayBuffer[TexeraWebSocketEvent]) = { + val service = new WorkflowService(WorkflowIdentity(1), computingUnitId = 1, cleanUpTimeout = 30) + val events = ArrayBuffer.empty[TexeraWebSocketEvent] + service.connect(evt => events += evt) + (service, events) + } + + private def errorEventsIn(events: ArrayBuffer[TexeraWebSocketEvent]): Seq[WorkflowErrorEvent] = + events.collect { case e: WorkflowErrorEvent => e }.toSeq + + "WorkflowService" should + "push a WorkflowErrorEvent carrying the store's fatal error to connected subscribers" in { + val (service, events) = serviceWithCollector() + val store = new ExecutionStateStore() + val err = fatalError("boom during init") + store.metadataStore.updateState(_.addFatalErrors(err)) + + service.reportFatalErrorsToSubscribers(store) + + val errorEvents = errorEventsIn(events) + errorEvents should have size 1 + // Forwards exactly the store's fatal errors -- no more, no less. + errorEvents.head.fatalErrors should contain theSameElementsAs Seq(err) + } + + it should "carry every fatal error currently recorded in the store" in { + val (service, events) = serviceWithCollector() + val store = new ExecutionStateStore() + val first = fatalError("first") + val second = fatalError("second") + store.metadataStore.updateState(_.addFatalErrors(first).addFatalErrors(second)) + + service.reportFatalErrorsToSubscribers(store) + + val errorEvents = errorEventsIn(events) + errorEvents should have size 1 + // Exactly the two recorded errors -- no extras. + errorEvents.head.fatalErrors should contain theSameElementsAs Seq(first, second) + } +}
