This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-region-gui
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 8eb1b9808e572d7b212eddec41058bd738c6e1c1
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Sep 21 20:23:38 2025 -0700

    init
---
 .../engine/architecture/controller/ClientEvent.scala      |  2 ++
 .../amber/engine/architecture/controller/Controller.scala | 11 +++++++++++
 .../architecture/controller/WorkflowScheduler.scala       |  2 +-
 .../amber/engine/architecture/scheduling/Schedule.scala   |  2 ++
 .../ics/texera/web/service/WorkflowExecutionService.scala | 15 +++++++++++++++
 5 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala
index d9cfe9c689..7cd0f91c2f 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala
@@ -46,3 +46,5 @@ case class UpdateExecutorCompleted(id: ActorVirtualIdentity) 
extends ClientEvent
 final case class ReplayStatusUpdate(id: ActorVirtualIdentity, status: Boolean) 
extends ClientEvent
 
 final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends 
ClientEvent
+
+final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends 
ClientEvent
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala
index bd54fd2613..8ce4a8eb4e 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala
@@ -111,6 +111,17 @@ class Controller(
   override def initState(): Unit = {
     attachRuntimeServicesToCPState()
     cp.workflowScheduler.updateSchedule(physicalPlan)
+
+    println("ewrfewf", cp.workflowScheduler.schedule.getRegions)
+
+
+    cp.asyncRPCClient.sendToClient(
+      ExecutionStatsUpdate(
+        cp.workflowExecution.getAllRegionExecutionsStats
+      )
+    )
+
+
     val controllerRestoreConf = controllerConfig.stateRestoreConfOpt
     if (controllerRestoreConf.isDefined) {
       globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = true)
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
index 732b970f03..78053d056b 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -32,7 +32,7 @@ class WorkflowScheduler(
     actorId: ActorVirtualIdentity
 ) extends java.io.Serializable {
   var physicalPlan: PhysicalPlan = _
-  private var schedule: Schedule = _
+  var schedule: Schedule = _
 
   /**
     * Update the schedule to be executed, based on the given physicalPlan.
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala
index c3ecbc1ae7..edc4d5848c 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala
@@ -22,6 +22,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling
 case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends 
Iterator[Set[Region]] {
   private var currentLevel = levelSets.keys.minOption.getOrElse(0)
 
+  def getRegions: List[Region] = levelSets.values.flatten.toList
+
   override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
 
   override def next(): Set[Region] = {
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
index b6c8cbb88e..7e09bc3513 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
@@ -90,6 +90,21 @@ class WorkflowExecutionService(
     })
   )
 
+  addSubscription(
+    client
+      .registerCallback[ExecutionStatsUpdate]((evt: ExecutionStatsUpdate) => {
+        stateStore.statsStore.updateState { statsStore =>
+          statsStore.withOperatorInfo(evt.operatorMetrics)
+        }
+        metricsPersistThread.foreach { thread =>
+          thread.execute(() => {
+            storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
+            lastPersistedMetrics = Some(evt.operatorMetrics)
+          })
+        }
+      })
+  )
+
   private def createStateEvent(state: ExecutionMetadataStore): 
WorkflowStateEvent = {
     if (state.isRecovering && state.state != COMPLETED) {
       WorkflowStateEvent("Recovering")

Reply via email to