This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-cm-for-loop-mat-dcm by 
this push:
     new f6b64b0fbe update
f6b64b0fbe is described below

commit f6b64b0fbe9f31eeb93f43a9d72487265b05c9bc
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 4 19:58:40 2026 -0800

    update
---
 .../engine/architecture/controller/WorkflowScheduler.scala     |  6 +++++-
 .../architecture/controller/execution/RegionExecution.scala    |  2 --
 .../architecture/controller/execution/WorkflowExecution.scala  |  5 -----
 .../texera/amber/engine/architecture/scheduling/Schedule.scala | 10 +++++++++-
 4 files changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..aa51f3f0cc 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -52,6 +52,10 @@ class WorkflowScheduler(
     this.physicalPlan = updatedPhysicalPlan
   }
 
-  def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
+  def getNextRegions: Set[Region] = {
+    val region : Set[Region] = if (!schedule.hasNext) Set() else 
schedule.loopNext()
+    println("current Region: " + region)
+    region
+  }
 
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
index d5939c2e3b..e905c2b044 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala
@@ -59,8 +59,6 @@ case class RegionExecution(region: Region) {
       physicalOpId: PhysicalOpIdentity,
       inheritOperatorExecution: Option[OperatorExecution] = None
   ): OperatorExecution = {
-    assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution 
already exists.")
-
     operatorExecutions.getOrElseUpdate(
       physicalOpId,
       inheritOperatorExecution
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b8b6d68091 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -44,11 +44,6 @@ case class WorkflowExecution() {
     * @throws AssertionError if the `RegionExecution` has already been 
initialized.
     */
   def initRegionExecution(region: Region): RegionExecution = {
-    // ensure the region execution hasn't been initialized already.
-    assert(
-      !regionExecutions.contains(region.id),
-      s"RegionExecution of ${region.id} already initialized."
-    )
     regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6f34c9ed1e..47474b8478 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -21,14 +21,22 @@ package 
org.apache.texera.amber.engine.architecture.scheduling
 
 case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends 
Iterator[Set[Region]] {
   private var currentLevel = levelSets.keys.minOption.getOrElse(0)
-
+  private var loopStartLevel = currentLevel
   def getRegions: List[Region] = levelSets.values.flatten.toList
 
   override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
 
   override def next(): Set[Region] = {
     val regions = levelSets(currentLevel)
+    
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))))
 loopStartLevel = currentLevel
     currentLevel += 1
     regions
   }
+
+  def loopNext(): Set[Region] = {
+    val regions = levelSets(currentLevel)
+    
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))))
 currentLevel = loopStartLevel
+    else currentLevel += 1
+    regions
+  }
 }

Reply via email to