This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new aec0009c3a test(amber): add unit test coverage for Region (#4568)
aec0009c3a is described below
commit aec0009c3a96450946e37cb778aee0152fe7f6e9
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 01:48:32 2026 -0700
test(amber): add unit test coverage for Region (#4568)
### What changes were proposed in this PR?
Add `RegionSpec` covering the public surface of `Region`
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Region.scala`):
- Constructor exposes `physicalOps`, `physicalLinks`, and `ports` via
`getOperators`/`getLinks`/`getPorts`
- `getOperator(id)` looks up by `PhysicalOpIdentity` and throws
`NoSuchElementException` for unknown ids
- `topologicalIterator` yields operators in topological order derived
from `physicalLinks`
- `getSourceOperators` treats operators without input ports as sources
- `getStarterOperators` equals `getSourceOperators` when no
`resourceConfig` is provided
### Any related issues, documentation, discussions?
Closes #4567
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.RegionSpec"` —
9/9 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../architecture/scheduling/RegionSpec.scala | 124 +++++++++++++++++++++
1 file changed, 124 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
new file mode 100644
index 0000000000..0aacaaeae2
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{
+ GlobalPortIdentity,
+ PhysicalLink,
+ PhysicalOp,
+ PortIdentity
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegionSpec extends AnyFlatSpec {
+
+ private def physicalOpId(opId: String): PhysicalOpIdentity =
+ PhysicalOpIdentity(OperatorIdentity(opId), "main")
+
+ private def op(opId: String): PhysicalOp =
+ PhysicalOp(
+ physicalOpId(opId),
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ OpExecInitInfo.Empty
+ )
+
+ private def link(fromOp: String, toOp: String): PhysicalLink =
+ PhysicalLink(physicalOpId(fromOp), PortIdentity(0), physicalOpId(toOp),
PortIdentity(0))
+
+ "Region" should "expose the physical operators provided at construction" in {
+ val a = op("a")
+ val b = op("b")
+ val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+ assert(region.getOperators == Set(a, b))
+ }
+
+ it should "expose the physical links provided at construction" in {
+ val a = op("a")
+ val b = op("b")
+ val ab = link("a", "b")
+ val region = Region(RegionIdentity(1), Set(a, b), Set(ab))
+
+ assert(region.getLinks == Set(ab))
+ }
+
+ it should "default ports to an empty set" in {
+ val region = Region(RegionIdentity(1), Set(op("a")), Set.empty)
+ assert(region.getPorts.isEmpty)
+ }
+
+ it should "expose the ports provided at construction" in {
+ val portId = GlobalPortIdentity(physicalOpId("a"), PortIdentity(0), input
= true)
+ val region = Region(RegionIdentity(1), Set(op("a")), Set.empty, ports =
Set(portId))
+ assert(region.getPorts == Set(portId))
+ }
+
+ "Region.getOperator" should "look up a physical operator by id" in {
+ val a = op("a")
+ val b = op("b")
+ val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+ assert(region.getOperator(physicalOpId("a")) == a)
+ assert(region.getOperator(physicalOpId("b")) == b)
+ }
+
+ it should "throw NoSuchElementException for an unknown operator id" in {
+ val region = Region(RegionIdentity(1), Set(op("a")), Set.empty)
+ assertThrows[NoSuchElementException] {
+ region.getOperator(physicalOpId("missing"))
+ }
+ }
+
+ "Region.topologicalIterator" should "yield operators in topological order
based on physical links" in {
+ val a = op("a")
+ val b = op("b")
+ val c = op("c")
+ val region = Region(RegionIdentity(1), Set(a, b, c), Set(link("a", "b"),
link("b", "c")))
+
+ assert(
+ region.topologicalIterator().toList ==
+ List(physicalOpId("a"), physicalOpId("b"), physicalOpId("c"))
+ )
+ }
+
+ "Region.getSourceOperators" should "treat operators without input ports as
sources" in {
+ val a = op("a")
+ val b = op("b")
+ val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+ assert(region.getSourceOperators == Set(a, b))
+ }
+
+ "Region.getStarterOperators" should "match getSourceOperators when no
resource config is provided" in {
+ val a = op("a")
+ val b = op("b")
+ val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+ assert(region.getStarterOperators == region.getSourceOperators)
+ }
+}