This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 44bc12f4e9 test(amber): add unit test coverage for WorkflowExecution
(#4572)
44bc12f4e9 is described below
commit 44bc12f4e990ce553dc298e0bf9a77021c027d12
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 20:29:27 2026 -0700
test(amber): add unit test coverage for WorkflowExecution (#4572)
### What changes were proposed in this PR?
Add `WorkflowExecutionSpec` covering the lifecycle and lookup contract
of `WorkflowExecution`
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala`):
- `initRegionExecution` creates a new `RegionExecution`; throws on
re-init for the same region id
- `hasRegionExecution` reflects init state
- `getRegionExecution` throws `NoSuchElementException` for unknown ids
- `getAllRegionExecutions` preserves insertion order (LinkedHashMap)
- `restartRegionExecution` behaves like a fresh init when no prior
region exists, and replaces a completed region with a fresh execution
- `getRunningRegionExecutions` excludes completed regions
- `getState` returns UNINITIALIZED on empty and COMPLETED when every
initialized region is completed
- `getLatestOperatorExecutionOption` returns None when none exists and
the latest match across regions otherwise (uses reference identity
because `OperatorExecution` is a no-field case class)
### Any related issues, documentation, discussions?
Closes #4571
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecutionSpec"`
— 12/12 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../execution/WorkflowExecutionSpec.scala | 167 +++++++++++++++++++++
1 file changed, 167 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecutionSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecutionSpec.scala
new file mode 100644
index 0000000000..94285e7f8c
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecutionSpec.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.scheduling.{Region,
RegionIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkflowExecutionSpec extends AnyFlatSpec {
+
+ private def physicalOpId(opId: String): PhysicalOpIdentity =
+ PhysicalOpIdentity(OperatorIdentity(opId), "main")
+
+ private def op(opId: String): PhysicalOp =
+ PhysicalOp(
+ physicalOpId(opId),
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ OpExecInitInfo.Empty
+ )
+
+ /** A region with no ports — its `RegionExecution.getState` defaults to
COMPLETED. */
+ private def region(regionId: Long, opId: String): Region =
+ Region(RegionIdentity(regionId), Set(op(opId)), Set.empty)
+
+ "WorkflowExecution.initRegionExecution" should "create a new RegionExecution
for the given region" in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+
+ val regionExecution = we.initRegionExecution(r)
+
+ assert(regionExecution.region == r)
+ assert(we.getRegionExecution(r.id) eq regionExecution)
+ }
+
+ it should "throw when called twice for the same region id" in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+ we.initRegionExecution(r)
+
+ assertThrows[AssertionError] {
+ we.initRegionExecution(r)
+ }
+ }
+
+ "WorkflowExecution.hasRegionExecution" should "be false before init and true
after" in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+
+ assert(!we.hasRegionExecution(r.id))
+ we.initRegionExecution(r)
+ assert(we.hasRegionExecution(r.id))
+ }
+
+ "WorkflowExecution.getRegionExecution" should "throw NoSuchElementException
for an unknown region id" in {
+ val we = WorkflowExecution()
+ assertThrows[NoSuchElementException] {
+ we.getRegionExecution(RegionIdentity(99))
+ }
+ }
+
+ "WorkflowExecution.getAllRegionExecutions" should "preserve the insertion
order of region executions" in {
+ val we = WorkflowExecution()
+ val r0 = region(0, "a")
+ val r1 = region(1, "b")
+ val r2 = region(2, "c")
+
+ val e0 = we.initRegionExecution(r0)
+ val e1 = we.initRegionExecution(r1)
+ val e2 = we.initRegionExecution(r2)
+
+ assert(we.getAllRegionExecutions.toList == List(e0, e1, e2))
+ }
+
+ "WorkflowExecution.restartRegionExecution" should "behave like a fresh init
when no prior region execution exists" in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+
+ val regionExecution = we.restartRegionExecution(r)
+
+ assert(we.hasRegionExecution(r.id))
+ assert(we.getRegionExecution(r.id) eq regionExecution)
+ }
+
+ it should "replace an existing completed region execution with a fresh one"
in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+ val original = we.initRegionExecution(r)
+ assert(original.isCompleted)
+
+ val replacement = we.restartRegionExecution(r)
+
+ assert(replacement ne original)
+ assert(we.getRegionExecution(r.id) eq replacement)
+ }
+
+ "WorkflowExecution.getRunningRegionExecutions" should "exclude completed
region executions" in {
+ val we = WorkflowExecution()
+ val r = region(1, "a")
+ val regionExecution = we.initRegionExecution(r)
+ assert(regionExecution.isCompleted)
+
+ assert(we.getRunningRegionExecutions.toList.isEmpty)
+ }
+
+ "WorkflowExecution.getState" should "return UNINITIALIZED when no regions
have been initialized" in {
+ val we = WorkflowExecution()
+ assert(we.getState == WorkflowAggregatedState.UNINITIALIZED)
+ assert(!we.isCompleted)
+ }
+
+ it should "return COMPLETED when every initialized region is completed" in {
+ val we = WorkflowExecution()
+ we.initRegionExecution(region(0, "a"))
+ we.initRegionExecution(region(1, "b"))
+
+ assert(we.getState == WorkflowAggregatedState.COMPLETED)
+ assert(we.isCompleted)
+ }
+
+ "WorkflowExecution.getLatestOperatorExecutionOption" should "return None
when no operator execution exists for the id" in {
+ val we = WorkflowExecution()
+ we.initRegionExecution(region(0, "a"))
+
+
assert(we.getLatestOperatorExecutionOption(physicalOpId("never-initialized")).isEmpty)
+ }
+
+ it should "return the latest matching operator execution across regions" in {
+ val we = WorkflowExecution()
+ val regionA = we.initRegionExecution(region(0, "a"))
+ val regionB = we.initRegionExecution(region(1, "b"))
+
+ val olderExecution = regionA.initOperatorExecution(physicalOpId("a"))
+ val newerExecution = regionB.initOperatorExecution(physicalOpId("a"))
+
+ val result = we.getLatestOperatorExecutionOption(physicalOpId("a"))
+ // Use reference identity: OperatorExecution is a no-field case class so
+ // instances are structurally equal; only `eq` distinguishes them.
+ assert(result.exists(_ eq newerExecution))
+ assert(!result.exists(_ eq olderExecution))
+ }
+}