This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a8057585fe9606ec24beee82c36d21641b54dc32
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Jan 14 14:08:50 2026 -0800

    feat(cache): update doc and number display for operators completing from 
cache.
---
 .../scheduling/RegionExecutionCoordinator.scala    | 17 +++++----
 docs/operator-port-cache.md                        | 40 ++++++++++++++++++++--
 .../workspace/service/joint-ui/joint-ui.service.ts | 11 +++---
 3 files changed, 52 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 9f7fe80adb..cc409ea098 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -137,15 +137,14 @@ class RegionExecutionCoordinator(
     region.getOperators.foreach { op =>
       val opExecution = regionExecution.initOperatorExecution(op.id)
       // Cached regions do not create workers; synthesize operator-level 
metrics instead.
-      val outputMetrics = op.outputPorts.keys.map { pid =>
-        val count = resourceConfig.portConfigs
-          .collectFirst {
-            case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
-              cfg.cachedTupleCount.getOrElse(0L)
-          }
-          .getOrElse(0L)
-        PortTupleMetricsMapping(pid, TupleMetrics(count, 0L))
-      }.toSeq
+      val outputMetrics = resourceConfig.portConfigs
+        .collect {
+          case (gpid, cfg: OutputPortConfig) if gpid.opId == op.id =>
+            // Only emit metrics for materialized outputs; UI treats missing 
ports as skipped.
+            val count = cfg.cachedTupleCount.getOrElse(0L)
+            PortTupleMetricsMapping(gpid.portId, TupleMetrics(count, 0L))
+        }
+        .toSeq
       val inputMetrics = op.inputPorts.keys
         .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
         .toSeq
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 940d0490fa..8fab7a1a59 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -40,8 +40,8 @@ ToSkip regions create lightweight state structures (Workflow 
→ Region → Oper
 ### 5. Stats Emission via Direct Client Updates
 Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). 
This reuses existing stats infrastructure without special-casing the frontend.
 
-### 6. No Explicit Cache Flag in Metrics
-Cached execution is inferred from `numWorkers=0` + instant completion, rather 
than adding a `from_cache` flag to protobuf messages. This minimizes protocol 
changes.
+### 6. Explicit Cache State in Metrics
+Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in 
`WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to 
users and distinguishes cache-hit completion from normal execution completion.
 
 ### 7. Deferred Lifecycle Management
 V1 assumes unlimited storage. Eviction, TTL, and garbage collection are 
research topics for future work, not implementation blockers.
@@ -119,6 +119,8 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
    - `numWorkers = 0`
    - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0`
    - Input/output tuple counts from cached metadata
+   - **UI**: The graph view displays `-` for cached input counts and for 
cached output ports that were not materialized.
+   - **Note**: Cached stats are synthetic (inputs default to 0; 
non-materialized outputs may be omitted). Do not use them for cost modeling 
until we add explicit tagging/filtering in `runtime_statistics`.
 5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` 
for materialized inputs
 6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment 
events (consistent with numWorkers=0)
 7. **Set phase to Completed**: Region lifecycle completes immediately
@@ -292,6 +294,16 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - ToSkip regions: `completeCachedRegion()` records cached operator metrics 
(numWorkers=0, processingTime=0) without creating workers, propagates cached 
URIs downstream
   - ToExecute regions: normal execution path
 - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct 
client updates, maintaining consistency with normal execution lifecycle
+- **Frontend cache visualization** (Phase 1.2 - Complete):
+  - Added `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
+  - Added `CompletedFromCache` phase to `RegionExecutionCoordinator` for 
cached region lifecycle
+  - Backend state conversion in `Utils.scala`: `aggregatedStateToString`, 
`stringToAggregatedState`, `maptoStatusCode`
+  - State aggregation in `ExecutionUtils.aggregateStates()` handles 
`COMPLETED_FROM_CACHE` as terminal state
+  - Frontend `OperatorState` enum includes `CompletedFromCache`
+  - Operator visualization: blue fill color (`#1890ff`) for cached operators 
in `joint-ui.service.ts`
+  - Port metrics display: cached input counts show `-`, and cached output 
ports without materialization show `-`
+  - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached 
regions in `workflow-editor.component.ts`
+  - Region visibility: shared state via `WorkflowActionService.showRegion` 
ensures correct visibility when regions are created during execution
 
 ### Architecture Integration
 The cache system integrates with three layers:
@@ -390,7 +402,29 @@ The cache system integrates with three layers:
 - [ ] Add unit tests for DAO operations
 - [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that 
delegate to service
 
-#### 1.2 Testing & Validation
+#### 1.2 Frontend Cache Visualization ✓ COMPLETE
+- [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
+  - Location: `/amber/src/main/protobuf/.../controlreturns.proto`
+- [x] Add `CompletedFromCache` phase to `RegionExecutionCoordinator`
+  - Cached regions use this phase instead of `Completed`
+  - Location: 
`/amber/src/main/scala/.../scheduling/RegionExecutionCoordinator.scala`
+- [x] Update backend state conversion functions in `Utils.scala`
+  - `aggregatedStateToString`: `COMPLETED_FROM_CACHE` → `"CompletedFromCache"`
+  - `stringToAggregatedState`: `"completedfromcache"` → `COMPLETED_FROM_CACHE`
+  - `maptoStatusCode`: `COMPLETED_FROM_CACHE` → `6`
+- [x] Update `ExecutionUtils.aggregateStates()` to handle 
`COMPLETED_FROM_CACHE`
+  - Treated as terminal state alongside `COMPLETED` and `TERMINATED`
+- [x] Add `CompletedFromCache` to frontend `OperatorState` enum
+  - Location: `/frontend/src/app/workspace/types/execute-workflow.interface.ts`
+- [x] Add blue fill color for cached operators in `joint-ui.service.ts`
+  - Color: `#1890ff` (Ant Design blue)
+- [x] Add blue fill for cached regions in `workflow-editor.component.ts`
+  - Color: `rgba(24,144,255,0.3)` (translucent blue)
+- [x] Show `-` for cached input counts and cached output ports without 
materialization
+- [x] Fix region visibility with shared state via 
`WorkflowActionService.showRegion`
+  - Ensures regions show correctly when user toggles visibility before 
execution
+
+#### 1.3 Testing & Validation
 - [ ] Verify downstream cached URI consumption across all operator types
 - [ ] Add integration tests: cache upsert → DB verification
 - [ ] Add E2E tests: run → cache → rerun → verify skip
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 4f9b984b4f..a07606611f 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -320,6 +320,8 @@ export class JointUIService {
 
     const inputMetrics = statistics.inputPortMetrics;
     const outputMetrics = statistics.outputPortMetrics;
+    // Cached operators show "-" for inputs and non-materialized outputs.
+    const isSkippedFromCache = statistics.operatorState === 
OperatorState.CompletedFromCache;
 
     const workerCount = statistics.numWorkers ?? 1;
     element.attr(`.${operatorWorkerCountClass}/text`, "#workers: " + 
String(workerCount));
@@ -330,7 +332,7 @@ export class JointUIService {
         const parts = portId.split("-");
         const numericSuffix = parts.length > 1 ? parts[1] : portId;
 
-        const count: number = inputMetrics[numericSuffix] ?? 0;
+        const count = inputMetrics[numericSuffix];
         const rawAttrs = (portDef.attrs as any) || {};
         const oldText: string = (rawAttrs[".port-label"] && 
rawAttrs[".port-label"].text) || "";
         let originalName = oldText.includes(":") ? oldText.split(":", 
1)[0].trim() : oldText;
@@ -339,7 +341,7 @@ export class JointUIService {
           originalName = portId;
         }
 
-        const labelText = `${count}`;
+        const labelText = isSkippedFromCache ? "-" : String(count ?? 0);
         element.portProp(portId, "attrs/.port-label/text", labelText);
       }
     });
@@ -350,7 +352,7 @@ export class JointUIService {
         const parts = portId.split("-");
         const numericSuffix = parts.length > 1 ? parts[1] : portId;
 
-        const count: number = outputMetrics[numericSuffix] ?? 0;
+        const count = outputMetrics[numericSuffix];
         const rawAttrs = (portDef.attrs as any) || {};
         const oldText: string = (rawAttrs[".port-label"] && 
rawAttrs[".port-label"].text) || "";
         let originalName = oldText.includes(":") ? oldText.split(":", 
1)[0].trim() : oldText;
@@ -359,7 +361,8 @@ export class JointUIService {
           originalName = portId;
         }
 
-        const labelText = `${count}`;
+        const labelText =
+          isSkippedFromCache && count === undefined ? "-" : String(count ?? 0);
 
         element.portProp(portId, "attrs/.port-label/text", labelText);
       }

Reply via email to