This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ae4ce38d8 test(amber): add unit test coverage for SchedulingUtils 
(#4566)
3ae4ce38d8 is described below

commit 3ae4ce38d8845a1dcae7e7d6fca225467e8bffe5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 15:51:05 2026 -0700

    test(amber): add unit test coverage for SchedulingUtils (#4566)
    
    ### What changes were proposed in this PR?
    
    Add `SchedulingUtilsSpec` covering the single public function
    `SchedulingUtils.replaceVertex`
    
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala`),
    which rewrites a vertex (and its incident edges) in a JGraphT
    `DirectedAcyclicGraph[Region, RegionLink]`.
    
    Mirror: removed duplicate dead code of the extra replaceVertex function.
    
    Cases:
    - Isolated vertex with no incident edges
    - Outgoing edges are rewritten to originate from the new vertex
    - Incoming edges are rewritten to terminate at the new vertex
    - Chain-middle replacement preserves upstream and downstream edges
    - No-op when old and new vertices are equal
    
    ### Any related issues, documentation, discussions?
    
    Closes #4565
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtilsSpec"`
    — 5/5 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../scheduling/ScheduleGenerator.scala             |  33 -----
 .../architecture/scheduling/SchedulingUtils.scala  |   2 +-
 .../scheduling/SchedulingUtilsSpec.scala           | 140 +++++++++++++++++++++
 3 files changed, 141 insertions(+), 34 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
index 0fe8ffc963..a5748fc71b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
@@ -25,43 +25,10 @@ import 
org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies.{
   DefaultResourceAllocator,
   ExecutionClusterInfo
 }
-import org.jgrapht.graph.DirectedAcyclicGraph
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.CollectionHasAsScala
 
-object ScheduleGenerator {
-  def replaceVertex(
-      graph: DirectedAcyclicGraph[Region, RegionLink],
-      oldVertex: Region,
-      newVertex: Region
-  ): Unit = {
-    if (oldVertex.equals(newVertex)) {
-      return
-    }
-    graph.addVertex(newVertex)
-    graph
-      .outgoingEdgesOf(oldVertex)
-      .asScala
-      .toList
-      .foreach(oldEdge => {
-        val dest = graph.getEdgeTarget(oldEdge)
-        graph.removeEdge(oldEdge)
-        graph.addEdge(newVertex, dest, RegionLink(newVertex.id, dest.id))
-      })
-    graph
-      .incomingEdgesOf(oldVertex)
-      .asScala
-      .toList
-      .foreach(oldEdge => {
-        val source = graph.getEdgeSource(oldEdge)
-        graph.removeEdge(oldEdge)
-        graph.addEdge(source, newVertex, RegionLink(source.id, newVertex.id))
-      })
-    graph.removeVertex(oldVertex)
-  }
-}
-
 abstract class ScheduleGenerator(
     workflowContext: WorkflowContext,
     var physicalPlan: PhysicalPlan
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
index 72cd215862..4a287d6c6f 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
@@ -24,7 +24,7 @@ import org.jgrapht.graph.DirectedAcyclicGraph
 import scala.jdk.CollectionConverters.CollectionHasAsScala
 
 object SchedulingUtils {
-
+  // TODO: remove this function
   def replaceVertex(
       graph: DirectedAcyclicGraph[Region, RegionLink],
       oldVertex: Region,
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
new file mode 100644
index 0000000000..18ee5e88f1
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.jgrapht.graph.DirectedAcyclicGraph
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.jdk.CollectionConverters.CollectionHasAsScala
+
+class SchedulingUtilsSpec extends AnyFlatSpec {
+
+  private def region(regionId: Long, opId: String): Region = {
+    val physicalOp = PhysicalOp(
+      PhysicalOpIdentity(OperatorIdentity(opId), "main"),
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      OpExecInitInfo.Empty
+    )
+    Region(RegionIdentity(regionId), Set(physicalOp), Set.empty)
+  }
+
+  private def newGraph(): DirectedAcyclicGraph[Region, RegionLink] =
+    new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink])
+
+  "SchedulingUtils.replaceVertex" should "replace an isolated vertex with no 
incident edges" in {
+    val graph = newGraph()
+    val oldVertex = region(1, "a")
+    val newVertex = region(1, "a-prime")
+    graph.addVertex(oldVertex)
+
+    SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+    assert(!graph.containsVertex(oldVertex))
+    assert(graph.containsVertex(newVertex))
+    assert(graph.edgeSet().isEmpty)
+  }
+
+  it should "rewrite outgoing edges to originate from the new vertex" in {
+    val graph = newGraph()
+    val oldVertex = region(1, "a")
+    val downstream = region(2, "b")
+    val newVertex = region(1, "a-prime")
+    graph.addVertex(oldVertex)
+    graph.addVertex(downstream)
+    graph.addEdge(oldVertex, downstream, RegionLink(oldVertex.id, 
downstream.id))
+
+    SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+    assert(!graph.containsVertex(oldVertex))
+    assert(graph.containsVertex(newVertex))
+    val outgoing = graph.outgoingEdgesOf(newVertex).asScala.toList
+    assert(outgoing.size == 1)
+    assert(graph.getEdgeTarget(outgoing.head) == downstream)
+    assert(outgoing.head == RegionLink(newVertex.id, downstream.id))
+  }
+
+  it should "rewrite incoming edges to terminate at the new vertex" in {
+    val graph = newGraph()
+    val upstream = region(0, "u")
+    val oldVertex = region(1, "a")
+    val newVertex = region(1, "a-prime")
+    graph.addVertex(upstream)
+    graph.addVertex(oldVertex)
+    graph.addEdge(upstream, oldVertex, RegionLink(upstream.id, oldVertex.id))
+
+    SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+    assert(!graph.containsVertex(oldVertex))
+    val incoming = graph.incomingEdgesOf(newVertex).asScala.toList
+    assert(incoming.size == 1)
+    assert(graph.getEdgeSource(incoming.head) == upstream)
+    assert(incoming.head == RegionLink(upstream.id, newVertex.id))
+  }
+
+  it should "preserve both upstream and downstream edges in a chain" in {
+    val graph = newGraph()
+    val upstream = region(0, "u")
+    val oldVertex = region(1, "a")
+    val downstream = region(2, "d")
+    val newVertex = region(1, "a-prime")
+    graph.addVertex(upstream)
+    graph.addVertex(oldVertex)
+    graph.addVertex(downstream)
+    graph.addEdge(upstream, oldVertex, RegionLink(upstream.id, oldVertex.id))
+    graph.addEdge(oldVertex, downstream, RegionLink(oldVertex.id, 
downstream.id))
+
+    SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+    assert(graph.vertexSet().asScala.toSet == Set(upstream, newVertex, 
downstream))
+    assert(
+      graph.edgeSet().asScala.toSet ==
+        Set(
+          RegionLink(upstream.id, newVertex.id),
+          RegionLink(newVertex.id, downstream.id)
+        )
+    )
+  }
+
+  it should "leave the graph unchanged when old and new vertices are equal" in 
{
+    val graph = newGraph()
+    val upstream = region(0, "u")
+    val vertex = region(1, "a")
+    val downstream = region(2, "d")
+    graph.addVertex(upstream)
+    graph.addVertex(vertex)
+    graph.addVertex(downstream)
+    graph.addEdge(upstream, vertex, RegionLink(upstream.id, vertex.id))
+    graph.addEdge(vertex, downstream, RegionLink(vertex.id, downstream.id))
+
+    SchedulingUtils.replaceVertex(graph, vertex, vertex)
+
+    assert(graph.vertexSet().asScala.toSet == Set(upstream, vertex, 
downstream))
+    assert(graph.edgeSet().size == 2)
+  }
+}

Reply via email to