This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 782c6f3e079470b9bafc12eb3d75615a2daad116
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Jan 14 11:23:39 2026 -0800

    feat(cache): add operator and region statuses on the frontend.
---
 .../engine/architecture/rpc/controlreturns.proto   |  1 +
 .../controller/execution/ExecutionUtils.scala      | 14 ++++++---
 .../scheduling/RegionExecutionCoordinator.scala    | 14 ++++++---
 .../apache/texera/amber/engine/common/Utils.scala  |  3 ++
 .../app/workspace/component/menu/menu.component.ts |  5 ++-
 .../workflow-editor/workflow-editor.component.ts   | 36 ++++++++++++++++------
 .../workspace/service/joint-ui/joint-ui.service.ts |  3 ++
 .../model/workflow-action.service.ts               |  9 ++++++
 .../workspace/types/execute-workflow.interface.ts  |  1 +
 9 files changed, 66 insertions(+), 20 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
index 43613b5cfd..0642c09a4a 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
@@ -126,6 +126,7 @@ enum WorkflowAggregatedState {
   UNKNOWN = 8;
   KILLED = 9;
   TERMINATED = 10;
+  COMPLETED_FROM_CACHE = 11;
 }
 
 message StartWorkflowResponse {
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
index 7ee9bc0473..2b028a5137 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala
@@ -50,7 +50,8 @@ object ExecutionUtils {
       WorkflowAggregatedState.RUNNING,
       WorkflowAggregatedState.UNINITIALIZED,
       WorkflowAggregatedState.PAUSED,
-      WorkflowAggregatedState.READY
+      WorkflowAggregatedState.READY,
+      Some(WorkflowAggregatedState.COMPLETED_FROM_CACHE)
     )
 
     def sumMetrics(
@@ -88,15 +89,20 @@ object ExecutionUtils {
       runningState: T,
       uninitializedState: T,
       pausedState: T,
-      readyState: T
+      readyState: T,
+      cachedState: Option[T] = None
   ): WorkflowAggregatedState = {
     states match {
       case _ if states.isEmpty                      => 
WorkflowAggregatedState.UNINITIALIZED
       case _ if states.forall(_ == completedState)  => 
WorkflowAggregatedState.COMPLETED
       case _ if states.forall(_ == terminatedState) => 
WorkflowAggregatedState.COMPLETED
-      case _ if states.exists(_ == runningState)    => 
WorkflowAggregatedState.RUNNING
+      case _ if cachedState.isDefined && states.forall(_ == cachedState.get) =>
+        WorkflowAggregatedState.COMPLETED_FROM_CACHE
+      case _ if states.exists(_ == runningState) => 
WorkflowAggregatedState.RUNNING
       case _ =>
-        val unCompletedStates = states.filter(_ != completedState)
+        val terminalStates =
+          Set(completedState, terminatedState) ++ cachedState.toSet
+        val unCompletedStates = states.filterNot(terminalStates.contains)
         if (unCompletedStates.forall(_ == uninitializedState)) {
           WorkflowAggregatedState.UNINITIALIZED
         } else if (unCompletedStates.forall(_ == pausedState)) {
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e0b6aca821..9f7fe80adb 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -115,6 +115,7 @@ class RegionExecutionCoordinator(
   private case object ExecutingDependeePortsPhase extends RegionExecutionPhase
   private case object ExecutingNonDependeePortsPhase extends 
RegionExecutionPhase
   private case object Completed extends RegionExecutionPhase
+  private case object CompletedFromCache extends RegionExecutionPhase
 
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
     Unexecuted
@@ -149,7 +150,7 @@ class RegionExecutionCoordinator(
         .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
         .toSeq
       val stats = OperatorMetrics(
-        WorkflowAggregatedState.COMPLETED,
+        WorkflowAggregatedState.COMPLETED_FROM_CACHE,
         OperatorStatistics(
           inputMetrics,
           outputMetrics
@@ -161,7 +162,7 @@ class RegionExecutionCoordinator(
     asyncRPCClient.sendToClient(
       ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats)
     )
-    setPhase(Completed)
+    setPhase(CompletedFromCache)
   }
 
   private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): 
Unit = {
@@ -257,7 +258,10 @@ class RegionExecutionCoordinator(
     }
   }
 
-  def isCompleted: Boolean = currentPhaseRef.get == Completed
+  def isCompleted: Boolean = {
+    val phase = currentPhaseRef.get
+    phase == Completed || phase == CompletedFromCache
+  }
 
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
@@ -286,8 +290,8 @@ class RegionExecutionCoordinator(
         }
       case ExecutingNonDependeePortsPhase =>
         tryCompleteRegionExecution()
-      case Completed =>
-        // Already completed, no further action needed.
+      case Completed | CompletedFromCache =>
+        // Already completed or used cache, no further action needed.
         Future.Unit
     }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
index dc074c1094..e95e1eec91 100644
--- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
+++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
@@ -103,6 +103,7 @@ object Utils extends LazyLogging {
       case WorkflowAggregatedState.FAILED        => "Failed"
       case WorkflowAggregatedState.KILLED        => "Killed"
       case WorkflowAggregatedState.UNKNOWN       => "Unknown"
+      case WorkflowAggregatedState.COMPLETED_FROM_CACHE => "CompletedFromCache"
       case WorkflowAggregatedState.Unrecognized(unrecognizedValue) =>
         s"Unrecognized($unrecognizedValue)"
     }
@@ -122,6 +123,7 @@ object Utils extends LazyLogging {
       case "killed"        => WorkflowAggregatedState.KILLED
       case "terminated"    => WorkflowAggregatedState.TERMINATED
       case "unknown"       => WorkflowAggregatedState.UNKNOWN
+      case "completedfromcache" => WorkflowAggregatedState.COMPLETED_FROM_CACHE
       case other           => throw new 
IllegalArgumentException(s"Unrecognized state: $other")
     }
   }
@@ -141,6 +143,7 @@ object Utils extends LazyLogging {
       case WorkflowAggregatedState.COMPLETED     => 3
       case WorkflowAggregatedState.FAILED        => 4
       case WorkflowAggregatedState.KILLED        => 5
+      case WorkflowAggregatedState.COMPLETED_FROM_CACHE => 6
       case other                                 => -1
     }
   }
diff --git a/frontend/src/app/workspace/component/menu/menu.component.ts 
b/frontend/src/app/workspace/component/menu/menu.component.ts
index e821bf3add..a1ef27aa6b 100644
--- a/frontend/src/app/workspace/component/menu/menu.component.ts
+++ b/frontend/src/app/workspace/component/menu/menu.component.ts
@@ -471,10 +471,13 @@ export class MenuComponent implements OnInit, OnDestroy {
   }
 
   public toggleRegion(): void {
+    // Update shared state so WorkflowEditorComponent can use it when creating 
region elements
+    this.workflowActionService.setShowRegion(this.showRegion);
+    // Apply visibility to any existing region elements
     this.workflowActionService
       .getJointGraphWrapper()
       .jointGraph.getElements()
-      .filter(el => el.get("type") === "region") // small improvement here too
+      .filter(el => el.get("type") === "region")
       .forEach(el => el.attr("body/visibility", this.showRegion ? "visible" : 
"hidden"));
   }
 
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index b23f92caf3..2d43f80a61 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -351,21 +351,39 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     );
 
     let regionMap: { regionElement: joint.dia.Element; operators: 
joint.dia.Cell[] }[] = [];
+    // Track whether regions should be visible (preserved across element 
recreation)
+    let regionsVisible = false;
+    const colorMap: Record<string, string> = {
+      ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
+      ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
+      Completed: "rgba(76,175,80,0.2)",
+      CompletedFromCache: "rgba(24,144,255,0.3)",
+    };
+
     // update region elements on execution
     this.executeWorkflowService
       .getRegionUpdateStream()
       .pipe(untilDestroyed(this))
       .subscribe(event => {
-        this.paper.model
-          .getCells()
-          .filter(element => element instanceof Region)
-          .forEach(element => element.remove());
+        // Preserve visibility state from existing elements before removing 
them
+        const existingRegions = this.paper.model.getCells().filter(element => 
element instanceof Region);
+        if (existingRegions.length > 0) {
+          regionsVisible = existingRegions[0].attr("body/visibility") === 
"visible";
+        } else {
+          // No existing regions - use the shared state from 
workflowActionService
+          regionsVisible = this.workflowActionService.getShowRegion();
+        }
+        existingRegions.forEach(element => element.remove());
 
         regionMap = event.regions.map(([id, region]) => {
           const element = new Region({ id: "region-" + id });
           const ops = region.map(id => this.paper.getModelById(id));
           this.paper.model.addCell(element);
           this.updateRegionElement(element, ops);
+          // Apply visibility state
+          if (regionsVisible) {
+            element.attr("body/visibility", "visible");
+          }
           return { regionElement: element, operators: ops };
         });
       });
@@ -381,12 +399,10 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       .getRegionStateStream()
       .pipe(untilDestroyed(this))
       .subscribe(region => {
-        const colorMap: Record<string, string> = {
-          ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
-          ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
-          Completed: "rgba(76,175,80,0.2)",
-        };
-        this.paper.getModelById("region-" + region.id).attr("body/fill", 
colorMap[region.state]);
+        const element = this.paper.getModelById("region-" + region.id);
+        if (element && colorMap[region.state]) {
+          element.attr("body/fill", colorMap[region.state]);
+        }
       });
   }
 
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 10d0348773..4f9b984b4f 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -404,6 +404,9 @@ export class JointUIService {
       case OperatorState.Completed:
         fillColor = "green";
         break;
+      case OperatorState.CompletedFromCache:
+        fillColor = "#1890ff";
+        break;
       case OperatorState.Pausing:
       case OperatorState.Paused:
         fillColor = "magenta";
diff --git 
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
 
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
index ddee22465a..a6071e5f95 100644
--- 
a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts
@@ -97,6 +97,7 @@ export class WorkflowActionService {
 
   private workflowSettings: WorkflowSettings;
   private workflowResetSubject = new Subject<void>();
+  private _showRegion: boolean = false;
 
   constructor(
     private operatorMetadataService: OperatorMetadataService,
@@ -919,4 +920,12 @@ export class WorkflowActionService {
   public getHighlightingEnabled() {
     return this.highlightingEnabled;
   }
+
+  public setShowRegion(show: boolean): void {
+    this._showRegion = show;
+  }
+
+  public getShowRegion(): boolean {
+    return this._showRegion;
+  }
 }
diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts 
b/frontend/src/app/workspace/types/execute-workflow.interface.ts
index 23ade23199..fcc22059f1 100644
--- a/frontend/src/app/workspace/types/execute-workflow.interface.ts
+++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts
@@ -76,6 +76,7 @@ export enum OperatorState {
   Resuming = "Resuming",
   Completed = "Completed",
   Recovering = "Recovering",
+  CompletedFromCache = "CompletedFromCache",
 }
 
 export interface OperatorStatistics

Reply via email to