This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9b42aede28 test(amber): add unit test coverage for WorkflowScheduler
(#4564)
9b42aede28 is described below
commit 9b42aede283aff5901c98a16b7168c2369994935
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 02:08:18 2026 -0700
test(amber): add unit test coverage for WorkflowScheduler (#4564)
### What changes were proposed in this PR?
Add `WorkflowSchedulerSpec` covering the public contract of
`WorkflowScheduler`
(`amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala`):
- `updateSchedule` populates `getSchedule` and `physicalPlan` from the
input workflow
- The produced schedule covers every operator in the (post-update)
physical plan
- `getNextRegions` exhausts the schedule and then returns an empty set
- The union of region sets pulled via `getNextRegions` matches
`getSchedule.getRegions`
### Any related issues, documentation, discussions?
Closes #4563
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.controller.WorkflowSchedulerSpec"`
— 4/4 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../controller/WorkflowSchedulerSpec.scala | 103 +++++++++++++++++++++
1 file changed, 103 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowSchedulerSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowSchedulerSpec.scala
new file mode 100644
index 0000000000..ac7358b438
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowSchedulerSpec.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller
+
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkflowSchedulerSpec extends AnyFlatSpec {
+
+ private def buildHeaderlessCsvKeywordWorkflow() = {
+ val csvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
+ val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+ buildWorkflow(
+ List(csvOpDesc, keywordOpDesc),
+ List(
+ LogicalLink(
+ csvOpDesc.operatorIdentifier,
+ PortIdentity(0),
+ keywordOpDesc.operatorIdentifier,
+ PortIdentity(0)
+ )
+ ),
+ new WorkflowContext()
+ )
+ }
+
+ "WorkflowScheduler.updateSchedule" should "populate the schedule and
physicalPlan fields" in {
+ val workflow = buildHeaderlessCsvKeywordWorkflow()
+ val scheduler = new WorkflowScheduler(workflow.context, CONTROLLER)
+
+ assert(scheduler.getSchedule == null)
+ assert(scheduler.physicalPlan == null)
+
+ scheduler.updateSchedule(workflow.physicalPlan)
+
+ assert(scheduler.getSchedule != null)
+ assert(scheduler.physicalPlan != null)
+ assert(scheduler.getSchedule.getRegions.nonEmpty)
+ }
+
+ it should "include every workflow operator in some region of the produced
schedule" in {
+ val workflow = buildHeaderlessCsvKeywordWorkflow()
+ val scheduler = new WorkflowScheduler(workflow.context, CONTROLLER)
+ scheduler.updateSchedule(workflow.physicalPlan)
+
+ val operatorsInSchedule = scheduler.getSchedule.getRegions
+ .flatMap(_.getOperators.map(_.id.logicalOpId))
+ .toSet
+ val operatorsInPlan =
scheduler.physicalPlan.operators.map(_.id.logicalOpId)
+
+ assert(operatorsInPlan.subsetOf(operatorsInSchedule))
+ }
+
+ "WorkflowScheduler.getNextRegions" should "exhaust the schedule and then
return an empty set" in {
+ val workflow = buildHeaderlessCsvKeywordWorkflow()
+ val scheduler = new WorkflowScheduler(workflow.context, CONTROLLER)
+ scheduler.updateSchedule(workflow.physicalPlan)
+
+ val pulledLevels = Iterator
+ .continually(scheduler.getNextRegions)
+ .takeWhile(_.nonEmpty)
+ .toList
+
+ assert(pulledLevels.nonEmpty)
+ assert(scheduler.getNextRegions.isEmpty)
+ }
+
+ it should "yield region sets that together cover every region in the
schedule" in {
+ val workflow = buildHeaderlessCsvKeywordWorkflow()
+ val scheduler = new WorkflowScheduler(workflow.context, CONTROLLER)
+ scheduler.updateSchedule(workflow.physicalPlan)
+
+ val expectedRegions = scheduler.getSchedule.getRegions.toSet
+ val pulledRegions = Iterator
+ .continually(scheduler.getNextRegions)
+ .takeWhile(_.nonEmpty)
+ .flatten
+ .toSet
+
+ assert(pulledRegions == expectedRegions)
+ }
+}