This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new aec0009c3a test(amber): add unit test coverage for Region (#4568)
aec0009c3a is described below

commit aec0009c3a96450946e37cb778aee0152fe7f6e9
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 01:48:32 2026 -0700

    test(amber): add unit test coverage for Region (#4568)
    
    ### What changes were proposed in this PR?
    
    Add `RegionSpec` covering the public surface of `Region`
    
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Region.scala`):
    
    - Constructor exposes `physicalOps`, `physicalLinks`, and `ports` via
    `getOperators`/`getLinks`/`getPorts`
    - `getOperator(id)` looks up by `PhysicalOpIdentity` and throws
    `NoSuchElementException` for unknown ids
    - `topologicalIterator` yields operators in topological order derived
    from `physicalLinks`
    - `getSourceOperators` treats operators without input ports as sources
    - `getStarterOperators` equals `getSourceOperators` when no
    `resourceConfig` is provided
    
    ### Any related issues, documentation, discussions?
    
    Closes #4567
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.architecture.scheduling.RegionSpec"` —
    9/9 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../architecture/scheduling/RegionSpec.scala       | 124 +++++++++++++++++++++
 1 file changed, 124 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
new file mode 100644
index 0000000000..0aacaaeae2
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionSpec.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{
+  GlobalPortIdentity,
+  PhysicalLink,
+  PhysicalOp,
+  PortIdentity
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegionSpec extends AnyFlatSpec {
+
+  private def physicalOpId(opId: String): PhysicalOpIdentity =
+    PhysicalOpIdentity(OperatorIdentity(opId), "main")
+
+  private def op(opId: String): PhysicalOp =
+    PhysicalOp(
+      physicalOpId(opId),
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      OpExecInitInfo.Empty
+    )
+
+  private def link(fromOp: String, toOp: String): PhysicalLink =
+    PhysicalLink(physicalOpId(fromOp), PortIdentity(0), physicalOpId(toOp), 
PortIdentity(0))
+
+  "Region" should "expose the physical operators provided at construction" in {
+    val a = op("a")
+    val b = op("b")
+    val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+    assert(region.getOperators == Set(a, b))
+  }
+
+  it should "expose the physical links provided at construction" in {
+    val a = op("a")
+    val b = op("b")
+    val ab = link("a", "b")
+    val region = Region(RegionIdentity(1), Set(a, b), Set(ab))
+
+    assert(region.getLinks == Set(ab))
+  }
+
+  it should "default ports to an empty set" in {
+    val region = Region(RegionIdentity(1), Set(op("a")), Set.empty)
+    assert(region.getPorts.isEmpty)
+  }
+
+  it should "expose the ports provided at construction" in {
+    val portId = GlobalPortIdentity(physicalOpId("a"), PortIdentity(0), input 
= true)
+    val region = Region(RegionIdentity(1), Set(op("a")), Set.empty, ports = 
Set(portId))
+    assert(region.getPorts == Set(portId))
+  }
+
+  "Region.getOperator" should "look up a physical operator by id" in {
+    val a = op("a")
+    val b = op("b")
+    val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+    assert(region.getOperator(physicalOpId("a")) == a)
+    assert(region.getOperator(physicalOpId("b")) == b)
+  }
+
+  it should "throw NoSuchElementException for an unknown operator id" in {
+    val region = Region(RegionIdentity(1), Set(op("a")), Set.empty)
+    assertThrows[NoSuchElementException] {
+      region.getOperator(physicalOpId("missing"))
+    }
+  }
+
+  "Region.topologicalIterator" should "yield operators in topological order 
based on physical links" in {
+    val a = op("a")
+    val b = op("b")
+    val c = op("c")
+    val region = Region(RegionIdentity(1), Set(a, b, c), Set(link("a", "b"), 
link("b", "c")))
+
+    assert(
+      region.topologicalIterator().toList ==
+        List(physicalOpId("a"), physicalOpId("b"), physicalOpId("c"))
+    )
+  }
+
+  "Region.getSourceOperators" should "treat operators without input ports as 
sources" in {
+    val a = op("a")
+    val b = op("b")
+    val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+    assert(region.getSourceOperators == Set(a, b))
+  }
+
+  "Region.getStarterOperators" should "match getSourceOperators when no 
resource config is provided" in {
+    val a = op("a")
+    val b = op("b")
+    val region = Region(RegionIdentity(1), Set(a, b), Set.empty)
+
+    assert(region.getStarterOperators == region.getSourceOperators)
+  }
+}

Reply via email to