This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit c3fd3b1b535e94e68034b7d113e9c0b1e24a1738
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jun 1 21:31:17 2026 -0700

    fix(amber): fix incorrectly resolved merge conflicts.
---
 .../controller/CacheReusePreSchedulingStep.scala   | 23 +++++-
 .../WorkerExecutionCompletedHandler.scala          |  5 +-
 .../scheduling/RegionExecutionCoordinator.scala    |  5 +-
 .../scheduling/WorkflowExecutionCoordinator.scala  | 87 ++++++++++------------
 .../cache-panel/cache-panel.component.ts           | 20 +++++
 .../context-menu/context-menu.component.html       |  8 +-
 .../workflow-editor/workflow-editor.component.ts   | 21 ++----
 .../workspace/service/joint-ui/joint-ui.service.ts | 38 ----------
 8 files changed, 93 insertions(+), 114 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
index 122eadf2da..14cafded9d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
@@ -19,6 +19,7 @@
 
 package org.apache.texera.amber.engine.architecture.controller
 
+import org.apache.texera.amber.core.storage.VFSURIFactory
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.core.workflow.{
   CachedOutput,
@@ -199,6 +200,22 @@ object CacheReusePreSchedulingStep {
     linksPrunedPlan.getSubPlan(executeOperators)
   }
 
+  /**
+    * Cache entries persist the `/result` leaf URI, but scheduling port 
configs and the input-port
+    * materialization readers expect the port *base* URI: they derive 
`resultURI`/`stateURI` from it
+    * via `VFSURIFactory`. Reconstruct the base from a cached `/result` URI so 
reuse-only ports line
+    * up with the (post state-materialization) base/result/state storage 
layout.
+    */
+  private def portBaseURIOf(cachedResultUri: URI): URI = {
+    val (workflowId, executionId, globalPortIdOpt, _) = 
VFSURIFactory.decodeURI(cachedResultUri)
+    val globalPortId = globalPortIdOpt.getOrElse(
+      throw new IllegalArgumentException(
+        s"Cached result URI is missing a globalPortId: $cachedResultUri"
+      )
+    )
+    VFSURIFactory.createPortBaseURI(workflowId, executionId, globalPortId)
+  }
+
   /**
     * Prepare generic planning hints for CostBasedScheduleGenerator.
     */
@@ -214,7 +231,7 @@ object CacheReusePreSchedulingStep {
       .flatMap { pid =>
         cachedOutputsByPort.get(pid).map { cached =>
           pid -> OutputPortConfig(
-            storageURI = cached.resultUri,
+            storageURIBase = portBaseURIOf(cached.resultUri),
             cachedTupleCount = cached.tupleCount,
             materialize = false
           )
@@ -230,7 +247,7 @@ object CacheReusePreSchedulingStep {
           val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId)
           cachedOutputsByPort.get(outputPort) match {
             case Some(cached) =>
-              val uris = acc.getOrElse(inputPort, List.empty) :+ 
cached.resultUri
+              val uris = acc.getOrElse(inputPort, List.empty) :+ 
portBaseURIOf(cached.resultUri)
               acc.updated(inputPort, uris)
             case None =>
               acc
@@ -265,7 +282,7 @@ object CacheReusePreSchedulingStep {
         .flatMap { outputPort =>
           cachedOutputsByPort.get(outputPort).map { cached =>
             outputPort -> OutputPortConfig(
-              storageURI = cached.resultUri,
+              storageURIBase = portBaseURIOf(cached.resultUri),
               cachedTupleCount = cached.tupleCount,
               materialize = false
             )
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index 01d18628d3..52155a6154 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -20,10 +20,7 @@
 package org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerAsyncRPCHandlerInitializer,
-  ExecutionStateUpdate
-}
+import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   EmptyRequest,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index aa1ff1fd43..d48c80cd41 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -38,7 +38,6 @@ import 
org.apache.texera.amber.engine.architecture.controller.execution.{
 }
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
-  ExecutionStateUpdate,
   ExecutionStatsUpdate,
   RuntimeStatisticsPersist,
   WorkerAssignmentUpdate
@@ -176,7 +175,7 @@ class RegionExecutionCoordinator(
   private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): 
Unit = {
     resourceConfig.portConfigs.collect {
       case (gpid, cfg: OutputPortConfig) =>
-        val storageUri = cfg.storageURI
+        val storageUri = VFSURIFactory.resultURI(cfg.storageURIBase)
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = executionId,
           globalPortId = gpid,
@@ -598,7 +597,7 @@ class RegionExecutionCoordinator(
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = executionId,
           globalPortId = outputPortId,
-          uri = outputCfg.storageURI
+          uri = VFSURIFactory.resultURI(outputCfg.storageURIBase)
         )
       case _ =>
     }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 161a116960..212a137918 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -26,8 +26,6 @@ import org.apache.texera.amber.engine.architecture.common.{
   PekkoActorRefMappingService,
   PekkoActorService
 }
-import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
-import 
org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerConfig,
   ExecutionStateUpdate
@@ -89,54 +87,51 @@ class WorkflowExecutionCoordinator(
     }
 
     // All existing regions are completed. Start the next region (if any).
-    val nextRegions = getNextRegions()
+    val nextRegions = if (!schedule.hasNext) Set.empty[Region] else 
schedule.next()
     if (nextRegions.isEmpty) {
-      if (regionExecutionCoordinators.values.forall(_.isCompleted)) {
-        asyncRPCClient.sendToClient(
-          ExecutionStateUpdate(workflowExecution.getState)
-        )
+      if (workflowExecution.isCompleted && 
completionNotified.compareAndSet(false, true)) {
+        
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
       }
-      Future.Unit
-    } else {
-      executedRegions.append(nextRegions)
-      val launches = nextRegions
-        .map(region => {
-          workflowExecution.initRegionExecution(region)
-          regionExecutionCoordinators(region.id) = new 
RegionExecutionCoordinator(
-            region,
-            workflowExecution,
-            executionId,
-            asyncRPCClient,
-            controllerConfig,
-            actorService,
-            actorRefService
-          )
-          regionExecutionCoordinators(region.id)
-        })
-        .map(_.syncStatusAndTransitionRegionExecutionPhase())
-        .toSeq
-      Future
-        .collect(launches)
-        .unit
-        .flatMap { _ =>
-          if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
-            Future.Unit
-          } else {
-            // All launched regions finished immediately (e.g., cached); 
proceed to next batch.
-            coordinateRegionExecutors(actorService)
-          }
-        }
-        .map { _ =>
-          if (
-            regionExecutionCoordinators.values.forall(_.isCompleted) &&
-            workflowExecution.isCompleted
-          ) {
-            asyncRPCClient.sendToClient(
-              ExecutionStateUpdate(workflowExecution.getState)
+      return Future.Unit
+    }
+
+    executedRegions.append(nextRegions)
+    Future
+      .collect(
+        nextRegions
+          .map(region => {
+            val isRestart = workflowExecution.hasRegionExecution(region.id)
+            if (isRestart) {
+              workflowExecution.restartRegionExecution(region)
+            } else {
+              workflowExecution.initRegionExecution(region)
+            }
+            regionExecutionCoordinators(region.id) = new 
RegionExecutionCoordinator(
+              region,
+              isRestart,
+              workflowExecution,
+              executionId,
+              asyncRPCClient,
+              controllerConfig,
+              actorService,
+              actorRefService
             )
-          }
+            regionExecutionCoordinators(region.id)
+          })
+          .map(_.syncStatusAndTransitionRegionExecutionPhase())
+          .toSeq
+      )
+      .unit
+      .flatMap { _ =>
+        if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
+          Future.Unit
+        } else {
+          // All launched regions finished immediately (e.g., cached); proceed 
to the next batch.
+          // Cached regions have no workers, so no port-completion event would 
otherwise re-trigger
+          // coordination, and completion is emitted once via the guard above 
when the schedule drains.
+          coordinateRegionExecutors(actorService)
         }
-    }
+      }
   }
 
   def getRegionOfLink(link: PhysicalLink): Region = {
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index 5e9e0b9d18..ae738e1e99 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -19,6 +19,15 @@
 
 import { Component, OnInit } from "@angular/core";
 import { ActivatedRoute } from "@angular/router";
+import { CommonModule } from "@angular/common";
+import { FormsModule } from "@angular/forms";
+import { NzButtonModule } from "ng-zorro-antd/button";
+import { NzPopconfirmModule } from "ng-zorro-antd/popconfirm";
+import { NzSwitchModule } from "ng-zorro-antd/switch";
+import { NzAlertModule } from "ng-zorro-antd/alert";
+import { NzTableModule } from "ng-zorro-antd/table";
+import { NzTagModule } from "ng-zorro-antd/tag";
+import { NzEmptyModule } from "ng-zorro-antd/empty";
 import { finalize, tap } from "rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowCacheEntry } from 
"../../../../dashboard/type/workflow-cache-entry";
@@ -39,6 +48,17 @@ import {
   selector: "texera-cache-panel",
   templateUrl: "cache-panel.component.html",
   styleUrls: ["cache-panel.component.scss"],
+  imports: [
+    CommonModule,
+    FormsModule,
+    NzButtonModule,
+    NzPopconfirmModule,
+    NzSwitchModule,
+    NzAlertModule,
+    NzTableModule,
+    NzTagModule,
+    NzEmptyModule,
+  ],
 })
 export class CachePanelComponent implements OnInit {
   public cacheEntries: WorkflowCacheEntry[] = [];
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
index 32c5c9a9b4..9d963aaf42 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
+++ 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
@@ -125,8 +125,8 @@
   </li>
   <li
     nz-menu-item
-    *ngIf="operatorMenuService.highlightedOperators.value.length === 1 &&
-  operatorMenuService.highlightedCommentBoxes.value.length === 0 &&
+    *ngIf="highlightedOperatorIds.length === 1 &&
+  highlightedCommentBoxIds.length === 0 &&
   !hasHighlightedLinks() &&
   isWorkflowModifiable"
     (click)="clearCacheForSelectedOperator()">
@@ -138,8 +138,8 @@
   </li>
   <li
     nz-menu-item
-    *ngIf="highlightedOperators.value.length === 1 &&
-    highlightedCommentBoxes.value.length === 0 &&
+    *ngIf="highlightedOperatorIds.length === 1 &&
+  highlightedCommentBoxIds.length === 0 &&
   !hasHighlightedLinks() &&
   isWorkflowModifiable"
     (click)="clearCacheUpToSelectedOperator()">
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index 39601d9347..bf4a1180ab 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -31,7 +31,7 @@ import { CacheUsageService } from 
"../../service/workflow-status/cache-usage.ser
 import { WorkflowCacheEntriesService } from 
"../../service/workflow-status/workflow-cache-entries.service";
 import { WorkflowStatusService } from 
"../../service/workflow-status/workflow-status.service";
 import { ExecutionState, OperatorState } from 
"../../types/execute-workflow.interface";
-import { LogicalPort, OperatorLink, OperatorPredicate } from 
"../../types/workflow-common.interface";
+import { LogicalPort, OperatorLink } from 
"../../types/workflow-common.interface";
 import { WorkflowCacheEntry } from 
"../../../dashboard/type/workflow-cache-entry";
 import { auditTime, filter, map, takeUntil, withLatestFrom } from 
"rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
@@ -480,8 +480,6 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     );
 
     let regionMap: { regionElement: joint.dia.Element; operators: 
joint.dia.Cell[] }[] = [];
-    // Track whether regions should be visible (preserved across element 
recreation)
-    let regionsVisible = false;
     const colorMap: Record<string, string> = {
       ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
       ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
@@ -494,25 +492,16 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       .getRegionUpdateStream()
       .pipe(untilDestroyed(this))
       .subscribe(event => {
-        // Preserve visibility state from existing elements before removing 
them
-        const existingRegions = this.paper.model.getCells().filter(element => 
element instanceof Region);
-        if (existingRegions.length > 0) {
-          regionsVisible = existingRegions[0].attr("body/visibility") === 
"visible";
-        } else {
-          // No existing regions - use the shared state from 
workflowActionService
-          regionsVisible = this.workflowActionService.getShowRegion();
-        }
-        existingRegions.forEach(element => element.remove());
+        this.paper.model
+          .getCells()
+          .filter(element => element instanceof Region)
+          .forEach(element => element.remove());
 
         regionMap = event.regions.map(([id, region]) => {
           const element = new Region({ id: "region-" + id });
           const ops = region.map(id => this.paper.getModelById(id));
           this.paper.model.addCell(element);
           this.updateRegionElement(element, ops);
-          // Apply visibility state
-          if (regionsVisible) {
-            element.attr("body/visibility", "visible");
-          }
           return { regionElement: element, operators: ops };
         });
         // regions are recreated on every update, so reapply the current 
toggle state to the new elements
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 0320e971fa..2d03f71aab 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -423,9 +423,6 @@ export class JointUIService {
       if (portId != null) {
         const parts = portId.split("-");
         const numericSuffix = parts.length > 1 ? parts[1] : portId;
-        const count: number = inputMetrics[numericSuffix] ?? 0;
-        element.portProp(portId, "attrs/.port-label/text", 
count.toLocaleString());
-
         const count = inputMetrics[numericSuffix];
         const rawAttrs = (portDef.attrs as any) || {};
         const oldText: string = (rawAttrs[".port-label"] && 
rawAttrs[".port-label"].text) || "";
@@ -448,7 +445,6 @@ export class JointUIService {
         const parts = portId.split("-");
         const numericSuffix = parts.length > 1 ? parts[1] : portId;
         const count: number = outputMetrics[numericSuffix] ?? 0;
-        element.portProp(portId, "attrs/.port-label/text", 
count.toLocaleString());
         const rawAttrs = (portDef.attrs as any) || {};
         const oldText: string = (rawAttrs[".port-label"] && 
rawAttrs[".port-label"].text) || "";
         let originalName = oldText.includes(":") ? oldText.split(":", 
1)[0].trim() : oldText;
@@ -842,40 +838,6 @@ export class JointUIService {
     ];
   }
 
-  /**
-   * This function create a custom svg style for the operator
-   * @returns the custom attributes of the tooltip.
-   */
-  public static getCustomOperatorStatusTooltipStyleAttrs(): 
joint.shapes.devs.ModelSelectors {
-    return {
-      "element-node": {
-        style: { "pointer-events": "none" },
-      },
-      polygon: {
-        fill: "#FFFFFF",
-        "follow-scale": true,
-        stroke: "purple",
-        "stroke-width": "2",
-        rx: "5px",
-        ry: "5px",
-        refPoints: "0,30 150,30 150,120 85,120 75,150 65,120 0,120",
-        display: "none",
-        style: { "pointer-events": "none" },
-      },
-      "#operatorCount": {
-        fill: "#595959",
-        "font-size": "12px",
-        ref: "polygon",
-        "y-alignment": "middle",
-        "x-alignment": "left",
-        "ref-x": 0.05,
-        "ref-y": 0.2,
-        display: "none",
-        style: { "pointer-events": "none" },
-      },
-    };
-  }
-
   /**
    * This function creates a custom svg style for the operator.
    * This function also makes the delete button defined above to emit the 
delete event that will

Reply via email to