This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this
push:
new 560b670a67 update
560b670a67 is described below
commit 560b670a6716021b623aba7cb9d34343d9d2c36e
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 27 22:22:12 2026 -0700
update
---
.../controller/ControllerProcessor.scala | 64 +++++++++++++---------
.../engine/architecture/scheduling/Schedule.scala | 7 +++
2 files changed, 44 insertions(+), 27 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index c0dd2c7ce8..094bbb876c 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -43,36 +43,46 @@ class ControllerProcessor(
val workflowExecution: WorkflowExecution = WorkflowExecution()
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
- // The coordinator consumes regions through callbacks rather than reading
Schedule directly.
- // This cursor tracks the next ranked level to execute and can be reset when
control flow
- // requests jumping back to the region containing a target operator.
private var nextRegionLevel: Option[Int] = None
- val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
- () => {
- Option(this.workflowScheduler.getSchedule)
- .map { schedule =>
- if (nextRegionLevel.isEmpty) {
- nextRegionLevel = Some(schedule.startingLevel)
- }
- nextRegionLevel
- .filter(schedule.levelSets.contains)
- .map { level =>
- nextRegionLevel = Some(level + 1)
- schedule.levelSets(level)
- }
- .getOrElse(Set.empty)
- }
- .getOrElse(Set.empty)
- },
- opId => {
- Option(this.workflowScheduler.getSchedule).foreach { schedule =>
- nextRegionLevel = schedule.levelSets.collectFirst {
- case (level, regions)
- if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
opId)) =>
- level
+
+ /**
+ * The coordinator consumes regions through this callback rather than
reading the schedule directly.
+ * The controller owns the cursor so it can reset the next schedule level
when control flow requests
+ * jumping back to the region containing a target operator.
+ */
+ private def getNextScheduledRegions():
Set[org.apache.texera.amber.engine.architecture.scheduling.Region] = {
+ Option(this.workflowScheduler.getSchedule)
+ .map { schedule =>
+ if (nextRegionLevel.isEmpty) {
+ nextRegionLevel = Some(schedule.startingLevel)
}
+ nextRegionLevel
+ .filter(schedule.levelSets.contains)
+ .map { level =>
+ nextRegionLevel = Some(level + 1)
+ schedule.levelSets(level)
+ }
+ .getOrElse(Set.empty)
}
- },
+ .getOrElse(Set.empty)
+ }
+
+ /**
+ * Resets the schedule cursor so the next coordinator pull starts from the
region containing the
+ * given operator. Schedule precomputes the operator-to-level mapping
because loop control flow may
+ * jump repeatedly and should avoid rescanning all level sets on each jump.
+ */
+ private def jumpToRegionContainingOperator(
+ opId: org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+ ): Unit = {
+ Option(this.workflowScheduler.getSchedule).foreach { schedule =>
+ nextRegionLevel = schedule.getLevelOfOperator(opId)
+ }
+ }
+
+ val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
+ getNextScheduledRegions,
+ jumpToRegionContainingOperator,
workflowExecution,
controllerConfig,
asyncRPCClient
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 65ed3f1fca..f0e0c5b21d 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,11 +19,18 @@
package org.apache.texera.amber.engine.architecture.scheduling
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+
case class Schedule(levelSets: Map[Int, Set[Region]]) extends
Iterable[Set[Region]] {
val startingLevel: Int = levelSets.keys.minOption.getOrElse(0)
+ private val operatorLevels = levelSets.iterator.flatMap { case (level,
regions) =>
+ regions.iterator.flatMap(region =>
region.getOperators.map(_.id.logicalOpId -> level))
+ }.toMap
def getRegions: List[Region] = levelSets.values.flatten.toList
+ def getLevelOfOperator(opId: OperatorIdentity): Option[Int] =
operatorLevels.get(opId)
+
override def iterator: Iterator[Set[Region]] =
levelSets.keys.toSeq.sorted.iterator.map(levelSets)
}