This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 3ae4ce38d8 test(amber): add unit test coverage for SchedulingUtils
(#4566)
3ae4ce38d8 is described below
commit 3ae4ce38d8845a1dcae7e7d6fca225467e8bffe5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 15:51:05 2026 -0700
test(amber): add unit test coverage for SchedulingUtils (#4566)
### What changes were proposed in this PR?
Add `SchedulingUtilsSpec` covering the single public function
`SchedulingUtils.replaceVertex`
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala`),
which rewrites a vertex (and its incident edges) in a JGraphT
`DirectedAcyclicGraph[Region, RegionLink]`.
Mirror: removed duplicate dead code of the extra replaceVertex function.
Cases:
- Isolated vertex with no incident edges
- Outgoing edges are rewritten to originate from the new vertex
- Incoming edges are rewritten to terminate at the new vertex
- Chain-middle replacement preserves upstream and downstream edges
- No-op when old and new vertices are equal
### Any related issues, documentation, discussions?
Closes #4565
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtilsSpec"`
— 5/5 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../scheduling/ScheduleGenerator.scala | 33 -----
.../architecture/scheduling/SchedulingUtils.scala | 2 +-
.../scheduling/SchedulingUtilsSpec.scala | 140 +++++++++++++++++++++
3 files changed, 141 insertions(+), 34 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
index 0fe8ffc963..a5748fc71b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ScheduleGenerator.scala
@@ -25,43 +25,10 @@ import
org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
-import org.jgrapht.graph.DirectedAcyclicGraph
import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala
-object ScheduleGenerator {
- def replaceVertex(
- graph: DirectedAcyclicGraph[Region, RegionLink],
- oldVertex: Region,
- newVertex: Region
- ): Unit = {
- if (oldVertex.equals(newVertex)) {
- return
- }
- graph.addVertex(newVertex)
- graph
- .outgoingEdgesOf(oldVertex)
- .asScala
- .toList
- .foreach(oldEdge => {
- val dest = graph.getEdgeTarget(oldEdge)
- graph.removeEdge(oldEdge)
- graph.addEdge(newVertex, dest, RegionLink(newVertex.id, dest.id))
- })
- graph
- .incomingEdgesOf(oldVertex)
- .asScala
- .toList
- .foreach(oldEdge => {
- val source = graph.getEdgeSource(oldEdge)
- graph.removeEdge(oldEdge)
- graph.addEdge(source, newVertex, RegionLink(source.id, newVertex.id))
- })
- graph.removeVertex(oldVertex)
- }
-}
-
abstract class ScheduleGenerator(
workflowContext: WorkflowContext,
var physicalPlan: PhysicalPlan
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
index 72cd215862..4a287d6c6f 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtils.scala
@@ -24,7 +24,7 @@ import org.jgrapht.graph.DirectedAcyclicGraph
import scala.jdk.CollectionConverters.CollectionHasAsScala
object SchedulingUtils {
-
+ // TODO: remove this function
def replaceVertex(
graph: DirectedAcyclicGraph[Region, RegionLink],
oldVertex: Region,
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
new file mode 100644
index 0000000000..18ee5e88f1
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/SchedulingUtilsSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.jgrapht.graph.DirectedAcyclicGraph
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.jdk.CollectionConverters.CollectionHasAsScala
+
+class SchedulingUtilsSpec extends AnyFlatSpec {
+
+ private def region(regionId: Long, opId: String): Region = {
+ val physicalOp = PhysicalOp(
+ PhysicalOpIdentity(OperatorIdentity(opId), "main"),
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ OpExecInitInfo.Empty
+ )
+ Region(RegionIdentity(regionId), Set(physicalOp), Set.empty)
+ }
+
+ private def newGraph(): DirectedAcyclicGraph[Region, RegionLink] =
+ new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink])
+
+ "SchedulingUtils.replaceVertex" should "replace an isolated vertex with no
incident edges" in {
+ val graph = newGraph()
+ val oldVertex = region(1, "a")
+ val newVertex = region(1, "a-prime")
+ graph.addVertex(oldVertex)
+
+ SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+ assert(!graph.containsVertex(oldVertex))
+ assert(graph.containsVertex(newVertex))
+ assert(graph.edgeSet().isEmpty)
+ }
+
+ it should "rewrite outgoing edges to originate from the new vertex" in {
+ val graph = newGraph()
+ val oldVertex = region(1, "a")
+ val downstream = region(2, "b")
+ val newVertex = region(1, "a-prime")
+ graph.addVertex(oldVertex)
+ graph.addVertex(downstream)
+ graph.addEdge(oldVertex, downstream, RegionLink(oldVertex.id,
downstream.id))
+
+ SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+ assert(!graph.containsVertex(oldVertex))
+ assert(graph.containsVertex(newVertex))
+ val outgoing = graph.outgoingEdgesOf(newVertex).asScala.toList
+ assert(outgoing.size == 1)
+ assert(graph.getEdgeTarget(outgoing.head) == downstream)
+ assert(outgoing.head == RegionLink(newVertex.id, downstream.id))
+ }
+
+ it should "rewrite incoming edges to terminate at the new vertex" in {
+ val graph = newGraph()
+ val upstream = region(0, "u")
+ val oldVertex = region(1, "a")
+ val newVertex = region(1, "a-prime")
+ graph.addVertex(upstream)
+ graph.addVertex(oldVertex)
+ graph.addEdge(upstream, oldVertex, RegionLink(upstream.id, oldVertex.id))
+
+ SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+ assert(!graph.containsVertex(oldVertex))
+ val incoming = graph.incomingEdgesOf(newVertex).asScala.toList
+ assert(incoming.size == 1)
+ assert(graph.getEdgeSource(incoming.head) == upstream)
+ assert(incoming.head == RegionLink(upstream.id, newVertex.id))
+ }
+
+ it should "preserve both upstream and downstream edges in a chain" in {
+ val graph = newGraph()
+ val upstream = region(0, "u")
+ val oldVertex = region(1, "a")
+ val downstream = region(2, "d")
+ val newVertex = region(1, "a-prime")
+ graph.addVertex(upstream)
+ graph.addVertex(oldVertex)
+ graph.addVertex(downstream)
+ graph.addEdge(upstream, oldVertex, RegionLink(upstream.id, oldVertex.id))
+ graph.addEdge(oldVertex, downstream, RegionLink(oldVertex.id,
downstream.id))
+
+ SchedulingUtils.replaceVertex(graph, oldVertex, newVertex)
+
+ assert(graph.vertexSet().asScala.toSet == Set(upstream, newVertex,
downstream))
+ assert(
+ graph.edgeSet().asScala.toSet ==
+ Set(
+ RegionLink(upstream.id, newVertex.id),
+ RegionLink(newVertex.id, downstream.id)
+ )
+ )
+ }
+
+ it should "leave the graph unchanged when old and new vertices are equal" in
{
+ val graph = newGraph()
+ val upstream = region(0, "u")
+ val vertex = region(1, "a")
+ val downstream = region(2, "d")
+ graph.addVertex(upstream)
+ graph.addVertex(vertex)
+ graph.addVertex(downstream)
+ graph.addEdge(upstream, vertex, RegionLink(upstream.id, vertex.id))
+ graph.addEdge(vertex, downstream, RegionLink(vertex.id, downstream.id))
+
+ SchedulingUtils.replaceVertex(graph, vertex, vertex)
+
+ assert(graph.vertexSet().asScala.toSet == Set(upstream, vertex,
downstream))
+ assert(graph.edgeSet().size == 2)
+ }
+}