This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5784-e270f830be7d11a59a8357eeecec2251d642c872 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 6433e713a08606eb952581828e8f9c360a763013 Author: Tanishq Gandhi <[email protected]> AuthorDate: Tue Jun 23 12:33:44 2026 -0700 fix(amber): surface real cause when output port schema is unavailable (#5784) ### What changes were proposed in this PR? When a workflow run fails because an output port's schema can't be resolved, the engine threw a generic `IllegalStateException("Schema is missing")`, discarding the actual cause. It typically happens when a dataset used by the workflow has not been shared with the user running it, but nothing in the message says so. Root cause: in `RegionExecutionCoordinator.createOutputPortStorageObjects`, the output port schema is an `Either[Throwable, Schema]`, but it was unwrapped with `Either.getOrElse(throw new IllegalStateException("Schema is missing"))`. getOrElse is right-biased, so on a `Left(cause)` it evaluates the default and throws away the real cause. ``` Before: Left(cause: "no access to dataset X") --getOrElse(throw)--> "Schema is missing" After: Left(cause) --> IllegalStateException("Failed to resolve the output schema: " + cause.msg, cause) ``` The fix matches on the `Either` and, on `Left`, throws an `IllegalStateException` that keeps the original cause as the exception cause **Before / After** (the message surfaces in the workspace Result Panel → Static Error frame): | | Message | |---|---| | **Before** | Schema is missing | | **After** | Failed to resolve the output schema: <real cause> (e.g. …: User has no access to this dataset) | ### Any related issues, documentation, discussions? Closes #3546 ### How was this PR tested? Added a regression test in `RegionExecutionCoordinatorSpec` that seeds an output port with a Left(cause) schema and asserts the coordinator throws an IllegalStateException whose getCause is the original cause and whose message contains the underlying message. The existing positive-path tests in the same spec continue to pass. ``` sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec" ... [info] Total number of tests run: 3 [info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) --- .../scheduling/RegionExecutionCoordinator.scala | 16 +++- .../RegionExecutionCoordinatorSpec.scala | 97 +++++++++++++++++++++- 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 4497d7c4ae..aba6eff759 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -572,10 +572,20 @@ class RegionExecutionCoordinator( val portBaseURI = portConfig.storageURIBase val resultURI = VFSURIFactory.resultURI(portBaseURI) val stateURI = VFSURIFactory.stateURI(portBaseURI) - val schemaOptional = - region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = - schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) + region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 match { + case Right(resolvedSchema) => resolvedSchema + case Left(cause) => + // The output port schema failed to resolve (e.g. a dataset the workflow reads is not + // shared with the running user, making its file and inferred schema unavailable). + // Surface the underlying cause instead of a generic "Schema is missing" (issue #3546). + val reason = Option(cause.getMessage).getOrElse(cause.toString) + logger.error(s"Output schema unavailable for port $outputPortId", cause) + throw new IllegalStateException( + s"Failed to resolve the output schema: $reason", + cause + ) + } // An output port whose storage accumulates across region re-executions // (e.g. a LoopEnd port, whose output builds up over the iterations of // its own loop) sets `reuseStorage`. When set, the port's existing diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala index 9e6cb227e5..2c663612c4 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala @@ -22,13 +22,29 @@ package org.apache.texera.amber.engine.architecture.scheduling import com.twitter.util.Future import org.apache.pekko.actor.ActorSystem import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.storage.VFSURIFactory import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} -import org.apache.texera.amber.core.workflow.PhysicalOp +import org.apache.texera.amber.core.workflow.{ + GlobalPortIdentity, + OutputPort, + PhysicalOp, + PortIdentity +} +import org.apache.texera.amber.core.workflow.WorkflowContext.{ + DEFAULT_EXECUTION_ID, + DEFAULT_WORKFLOW_ID +} import org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService import org.apache.texera.amber.engine.architecture.controller.ControllerConfig import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.architecture.rpc.controlreturns._ import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._ +import org.apache.texera.amber.engine.architecture.scheduling.config.{ + OperatorConfig, + OutputPortConfig, + ResourceConfig, + WorkerConfig +} import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState import org.apache.texera.amber.engine.common.AmberRuntime import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER @@ -120,6 +136,85 @@ class RegionExecutionCoordinatorSpec assert(workerState(fixture) == WorkerState.TERMINATED) } + it should "surface the underlying cause when an output port schema is unavailable" in { + // Reproduces issue #3546: when schema inference for an output port fails (e.g. because a + // dataset used by the workflow has not been shared with the running user), the port's + // schema is stored as a `Left(cause)`. The coordinator must surface that real cause rather + // than discarding it behind a generic "Schema is missing" message. + val cause = new RuntimeException("User texera1 has no access to dataset 'iris'") + val coordinator = coordinatorWithUnresolvedOutputSchema(cause) + + val thrown = intercept[IllegalStateException] { + await(coordinator.syncStatusAndTransitionRegionExecutionPhase()) + } + assert(thrown.getCause eq cause) + assert(thrown.getMessage.contains(cause.getMessage)) + } + + it should "fall back to the throwable's string form when the cause has no message" in { + // Some throwables (e.g. NullPointerException) carry a null message; the surfaced text must + // not read "...: null". + val cause = new NullPointerException() + assert(cause.getMessage == null) + val coordinator = coordinatorWithUnresolvedOutputSchema(cause) + + val thrown = intercept[IllegalStateException] { + await(coordinator.syncStatusAndTransitionRegionExecutionPhase()) + } + assert(thrown.getCause eq cause) + assert(thrown.getMessage.contains(cause.toString)) + assert(!thrown.getMessage.endsWith("null")) + } + + /** + * Builds a coordinator for a single-source region whose only output port has an unresolved + * schema (`Left(cause)`) and a configured output storage, so that the non-dependee phase + * reaches `createOutputPortStorageObjects` and attempts to read that schema. + */ + private def coordinatorWithUnresolvedOutputSchema( + cause: Throwable + ): RegionExecutionCoordinator = { + val portId = PortIdentity(0) + val baseOp = createSourceOp("schema-missing-op").withOutputPorts(List(OutputPort(portId))) + val (outPort, links, _) = baseOp.outputPorts(portId) + val physicalOp = + baseOp.copy(outputPorts = baseOp.outputPorts.updated(portId, (outPort, links, Left(cause)))) + + val workerId = createWorkerId(physicalOp) + val globalPortId = GlobalPortIdentity(physicalOp.id, portId) + val storageBase = + VFSURIFactory.createPortBaseURI(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID, globalPortId) + val region = Region( + RegionIdentity(1), + physicalOps = Set(physicalOp), + physicalLinks = Set.empty, + resourceConfig = Some( + ResourceConfig( + operatorConfigs = Map(physicalOp.id -> OperatorConfig(List(WorkerConfig(workerId)))), + portConfigs = Map(globalPortId -> OutputPortConfig(storageBase)) + ) + ) + ) + + val workflowExecution = WorkflowExecution() + seedReusableWorkerExecution(workflowExecution, seedRegionId = 0, physicalOp, workerId) + workflowExecution.initRegionExecution(region) + + val rpcProbe = new ControllerRpcProbe(_ => None) + val controller = createControllerHarness() + registerLiveWorker(controller.actorRefService, workerId) + + new RegionExecutionCoordinator( + region, + isRestart = false, + workflowExecution, + rpcProbe.asyncRPCClient, + ControllerConfig(None, None, None, None), + controller.actorService, + controller.actorRefService + ) + } + private case class SingleRegionFixture( coordinator: RegionExecutionCoordinator, rpcProbe: ControllerRpcProbe,
