This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 782c6f3e079470b9bafc12eb3d75615a2daad116 Author: Xiaozhen Liu <[email protected]> AuthorDate: Wed Jan 14 11:23:39 2026 -0800 feat(cache): add operator and region statuses on the frontend. --- .../engine/architecture/rpc/controlreturns.proto | 1 + .../controller/execution/ExecutionUtils.scala | 14 ++++++--- .../scheduling/RegionExecutionCoordinator.scala | 14 ++++++--- .../apache/texera/amber/engine/common/Utils.scala | 3 ++ .../app/workspace/component/menu/menu.component.ts | 5 ++- .../workflow-editor/workflow-editor.component.ts | 36 ++++++++++++++++------ .../workspace/service/joint-ui/joint-ui.service.ts | 3 ++ .../model/workflow-action.service.ts | 9 ++++++ .../workspace/types/execute-workflow.interface.ts | 1 + 9 files changed, 66 insertions(+), 20 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfd..0642c09a4a 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -126,6 +126,7 @@ enum WorkflowAggregatedState { UNKNOWN = 8; KILLED = 9; TERMINATED = 10; + COMPLETED_FROM_CACHE = 11; } message StartWorkflowResponse { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala index 7ee9bc0473..2b028a5137 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/ExecutionUtils.scala @@ -50,7 +50,8 @@ object ExecutionUtils { WorkflowAggregatedState.RUNNING, WorkflowAggregatedState.UNINITIALIZED, WorkflowAggregatedState.PAUSED, - WorkflowAggregatedState.READY + WorkflowAggregatedState.READY, + Some(WorkflowAggregatedState.COMPLETED_FROM_CACHE) ) def sumMetrics( @@ -88,15 +89,20 @@ object ExecutionUtils { runningState: T, uninitializedState: T, pausedState: T, - readyState: T + readyState: T, + cachedState: Option[T] = None ): WorkflowAggregatedState = { states match { case _ if states.isEmpty => WorkflowAggregatedState.UNINITIALIZED case _ if states.forall(_ == completedState) => WorkflowAggregatedState.COMPLETED case _ if states.forall(_ == terminatedState) => WorkflowAggregatedState.COMPLETED - case _ if states.exists(_ == runningState) => WorkflowAggregatedState.RUNNING + case _ if cachedState.isDefined && states.forall(_ == cachedState.get) => + WorkflowAggregatedState.COMPLETED_FROM_CACHE + case _ if states.exists(_ == runningState) => WorkflowAggregatedState.RUNNING case _ => - val unCompletedStates = states.filter(_ != completedState) + val terminalStates = + Set(completedState, terminatedState) ++ cachedState.toSet + val unCompletedStates = states.filterNot(terminalStates.contains) if (unCompletedStates.forall(_ == uninitializedState)) { WorkflowAggregatedState.UNINITIALIZED } else if (unCompletedStates.forall(_ == pausedState)) { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index e0b6aca821..9f7fe80adb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -115,6 +115,7 @@ class RegionExecutionCoordinator( private case object ExecutingDependeePortsPhase extends RegionExecutionPhase private case object ExecutingNonDependeePortsPhase extends RegionExecutionPhase private case object Completed extends RegionExecutionPhase + private case object CompletedFromCache extends RegionExecutionPhase private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( Unexecuted @@ -149,7 +150,7 @@ class RegionExecutionCoordinator( .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) .toSeq val stats = OperatorMetrics( - WorkflowAggregatedState.COMPLETED, + WorkflowAggregatedState.COMPLETED_FROM_CACHE, OperatorStatistics( inputMetrics, outputMetrics @@ -161,7 +162,7 @@ class RegionExecutionCoordinator( asyncRPCClient.sendToClient( ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats) ) - setPhase(Completed) + setPhase(CompletedFromCache) } private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): Unit = { @@ -257,7 +258,10 @@ class RegionExecutionCoordinator( } } - def isCompleted: Boolean = currentPhaseRef.get == Completed + def isCompleted: Boolean = { + val phase = currentPhaseRef.get + phase == Completed || phase == CompletedFromCache + } /** * This will sync and transition the region execution phase from one to another depending on its current phase: @@ -286,8 +290,8 @@ class RegionExecutionCoordinator( } case ExecutingNonDependeePortsPhase => tryCompleteRegionExecution() - case Completed => - // Already completed, no further action needed. + case Completed | CompletedFromCache => + // Already completed or used cache, no further action needed. Future.Unit } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala index dc074c1094..e95e1eec91 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala @@ -103,6 +103,7 @@ object Utils extends LazyLogging { case WorkflowAggregatedState.FAILED => "Failed" case WorkflowAggregatedState.KILLED => "Killed" case WorkflowAggregatedState.UNKNOWN => "Unknown" + case WorkflowAggregatedState.COMPLETED_FROM_CACHE => "CompletedFromCache" case WorkflowAggregatedState.Unrecognized(unrecognizedValue) => s"Unrecognized($unrecognizedValue)" } @@ -122,6 +123,7 @@ object Utils extends LazyLogging { case "killed" => WorkflowAggregatedState.KILLED case "terminated" => WorkflowAggregatedState.TERMINATED case "unknown" => WorkflowAggregatedState.UNKNOWN + case "completedfromcache" => WorkflowAggregatedState.COMPLETED_FROM_CACHE case other => throw new IllegalArgumentException(s"Unrecognized state: $other") } } @@ -141,6 +143,7 @@ object Utils extends LazyLogging { case WorkflowAggregatedState.COMPLETED => 3 case WorkflowAggregatedState.FAILED => 4 case WorkflowAggregatedState.KILLED => 5 + case WorkflowAggregatedState.COMPLETED_FROM_CACHE => 6 case other => -1 } } diff --git a/frontend/src/app/workspace/component/menu/menu.component.ts b/frontend/src/app/workspace/component/menu/menu.component.ts index e821bf3add..a1ef27aa6b 100644 --- a/frontend/src/app/workspace/component/menu/menu.component.ts +++ b/frontend/src/app/workspace/component/menu/menu.component.ts @@ -471,10 +471,13 @@ export class MenuComponent implements OnInit, OnDestroy { } public toggleRegion(): void { + // Update shared state so WorkflowEditorComponent can use it when creating region elements + this.workflowActionService.setShowRegion(this.showRegion); + // Apply visibility to any existing region elements this.workflowActionService .getJointGraphWrapper() .jointGraph.getElements() - .filter(el => el.get("type") === "region") // small improvement here too + .filter(el => el.get("type") === "region") .forEach(el => el.attr("body/visibility", this.showRegion ? "visible" : "hidden")); } diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf3..2d43f80a61 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -351,21 +351,39 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy ); let regionMap: { regionElement: joint.dia.Element; operators: joint.dia.Cell[] }[] = []; + // Track whether regions should be visible (preserved across element recreation) + let regionsVisible = false; + const colorMap: Record<string, string> = { + ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", + ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", + Completed: "rgba(76,175,80,0.2)", + CompletedFromCache: "rgba(24,144,255,0.3)", + }; + // update region elements on execution this.executeWorkflowService .getRegionUpdateStream() .pipe(untilDestroyed(this)) .subscribe(event => { - this.paper.model - .getCells() - .filter(element => element instanceof Region) - .forEach(element => element.remove()); + // Preserve visibility state from existing elements before removing them + const existingRegions = this.paper.model.getCells().filter(element => element instanceof Region); + if (existingRegions.length > 0) { + regionsVisible = existingRegions[0].attr("body/visibility") === "visible"; + } else { + // No existing regions - use the shared state from workflowActionService + regionsVisible = this.workflowActionService.getShowRegion(); + } + existingRegions.forEach(element => element.remove()); regionMap = event.regions.map(([id, region]) => { const element = new Region({ id: "region-" + id }); const ops = region.map(id => this.paper.getModelById(id)); this.paper.model.addCell(element); this.updateRegionElement(element, ops); + // Apply visibility state + if (regionsVisible) { + element.attr("body/visibility", "visible"); + } return { regionElement: element, operators: ops }; }); }); @@ -381,12 +399,10 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy .getRegionStateStream() .pipe(untilDestroyed(this)) .subscribe(region => { - const colorMap: Record<string, string> = { - ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", - ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", - Completed: "rgba(76,175,80,0.2)", - }; - this.paper.getModelById("region-" + region.id).attr("body/fill", colorMap[region.state]); + const element = this.paper.getModelById("region-" + region.id); + if (element && colorMap[region.state]) { + element.attr("body/fill", colorMap[region.state]); + } }); } diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 10d0348773..4f9b984b4f 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -404,6 +404,9 @@ export class JointUIService { case OperatorState.Completed: fillColor = "green"; break; + case OperatorState.CompletedFromCache: + fillColor = "#1890ff"; + break; case OperatorState.Pausing: case OperatorState.Paused: fillColor = "magenta"; diff --git a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts index ddee22465a..a6071e5f95 100644 --- a/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts +++ b/frontend/src/app/workspace/service/workflow-graph/model/workflow-action.service.ts @@ -97,6 +97,7 @@ export class WorkflowActionService { private workflowSettings: WorkflowSettings; private workflowResetSubject = new Subject<void>(); + private _showRegion: boolean = false; constructor( private operatorMetadataService: OperatorMetadataService, @@ -919,4 +920,12 @@ export class WorkflowActionService { public getHighlightingEnabled() { return this.highlightingEnabled; } + + public setShowRegion(show: boolean): void { + this._showRegion = show; + } + + public getShowRegion(): boolean { + return this._showRegion; + } } diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade23199..fcc22059f1 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -76,6 +76,7 @@ export enum OperatorState { Resuming = "Resuming", Completed = "Completed", Recovering = "Recovering", + CompletedFromCache = "CompletedFromCache", } export interface OperatorStatistics
