This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 9a037f52a010ff0ed90d9184624ed1e940a858c0
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Jan 14 21:49:46 2026 -0800

    feat(cache): minor improvements to cache display panel.
---
 docs/operator-port-cache.md                        | 142 +++++++++++++++------
 .../cache-panel/cache-panel.component.html         |  27 ++--
 .../cache-panel/cache-panel.component.scss         |  13 +-
 .../cache-panel/cache-panel.component.ts           |  20 ++-
 4 files changed, 152 insertions(+), 50 deletions(-)

diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 7f627755e5..0d1348c740 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -12,6 +12,7 @@ Enable **incremental workflow execution** through cost-aware 
caching of operator
 **Key principle**: The physical plan remains unchanged; cache-or-recompute 
decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based 
on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream 
sub-DAG.
 
 **Research goals**:
+
 - Extend Pasta scheduler with cost-based caching decisions
 - Develop cost models and pruning heuristics for what/when to cache
 - Evaluate speedup on iterative data science workflows
@@ -20,33 +21,43 @@ Enable **incremental workflow execution** through 
cost-aware caching of operator
 ## Key Design Decisions
 
 ### 1. Physical Plan Immutability
+
 The physical plan is never modified to accommodate caching. Cache decisions 
are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This 
preserves the integrity of the Pasta scheduling framework.
 
 ### 2. Fingerprint-Based Correctness
+
 Cache keys use SHA-256 hashes of canonical upstream sub-DAG representations 
(operators + schemas + edges + init info). This ensures:
+
 - Deterministic matching: same computation = same fingerprint
 - Automatic invalidation: any upstream change = different fingerprint = cache 
miss
 - No explicit dependency tracking needed
 
 ### 3. Region Homogeneity Constraint
+
 A region is either fully cached (ToSkip) or fully executed (ToExecute) — no 
partial execution within a region. This simplifies:
+
 - Scheduler logic (binary cached flag per region)
 - Runtime state management (consistent execution mode)
 - Cost model (compare full region costs)
 
 ### 4. Shallow State Hierarchy for Cached Regions
+
 ToSkip regions create lightweight state structures (Workflow → Region → 
Operator/Link) and store cached metrics at the operator level. No 
Worker/Channel states are created, so `numWorkers=0` and no worker assignments 
are emitted.
 
 ### 5. Stats Emission via Direct Client Updates
+
 Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). 
This reuses existing stats infrastructure without special-casing the frontend.
 
 ### 6. Explicit Cache State in Metrics
+
 Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in 
`WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to 
users and distinguishes cache-hit completion from normal execution completion.
 
 ### 7. Deferred Lifecycle Management
+
 V1 assumes unlimited storage. Eviction, TTL, and garbage collection are 
research topics for future work, not implementation blockers.
 
 ## Data Model
+
 - Table `operator_port_cache` (PK `(workflow_id, global_port_id, 
subdag_hash)`):
   - `fingerprint_json`: canonical JSON of the upstream sub‑DAG.
   - `subdag_hash`: SHA-256 of `fingerprint_json`.
@@ -57,6 +68,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 - Status: schema + migration added (`sql/updates/cache.sql`).
 
 ## Fingerprint
+
 - Utility: `FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
globalPortId) -> (fingerprintJson, subdagHash)`.
 - Canonical payload (sorted):
   - Target port ID.
@@ -67,6 +79,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 ## End-to-End Workflow
 
 ### 1. Cache Lookup (Submission Time)
+
 **Location**: `WorkflowExecutionService` → `OperatorPortCacheService` → 
`OperatorPortCacheDao`
 
 - Compile logical workflow to `PhysicalPlan`
@@ -79,36 +92,44 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 - Store in `WorkflowSettings.cachedOutputs` and pass to scheduler
 
 ### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator)
+
 **Location**: `CostBasedScheduleGenerator`
 
 **Inputs**:
+
 - Physical plan (immutable)
 - `cachedOutputs` (Δ): map of cache hits from step 1
 - Visible ports (☐): ports that must be materialized for user visibility
 
 **Region Classification**:
+
 - **Homogeneity constraint**: A region is either fully cached (ToSkip) or 
fully executed (ToExecute) — no mixing within a region
 - **ToExecute regions**: Contain visible ports without cache hits OR depend on 
uncached intermediate materializations
 - **ToSkip regions**: All required output ports have cache hits AND no visible 
ports need fresh computation
 
 **Cost Model** (currently simple, needs refinement for research):
+
 - Cached regions: cost = 0
 - Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + 
materialization read/write costs
 - Cache read/write: small fixed costs (0.5 per port)
 - **Future**: Historical stats-based estimation from `runtime_statistics` table
 
 **Output**:
+
 - Schedule with regions marked `cached=true/false`
 - Port configs updated with cached URIs for ToSkip regions
 - Cached tuple counts reused in port metadata
 
 ### 3. Runtime Execution
+
 **Location**: `RegionExecutionCoordinator`, `PortCompletedHandler`
 
 #### ToSkip Regions (Cached)
+
 Entry point: `RegionExecutionCoordinator` constructor branches on 
`region.cached` flag
 
 **Execution path**:
+
 1. **Skip operator execution**: Call `completeCachedRegion()` immediately
 2. **State hierarchy** (shallow):
    - Create: Workflow → Region → Operator/Link states
@@ -126,13 +147,16 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 7. **Set phase to Completed**: Region lifecycle completes immediately
 
 #### ToExecute Regions (Normal Execution)
+
 **Location**: `PortCompletedHandler` → `PortMaterialized event` → 
`ExecutionCacheService` → `OperatorPortCacheService` → `OperatorPortCacheDao`
 
 1. **Execute operators**: Normal execution path via worker actors
 2. **On output port completion** (`PortCompletedHandler`):
    - Retrieve result URI from 
`WorkflowExecutionsResource.getResultUriByPhysicalPortId`
-  - Retrieve tuple count (best-effort via runtime stats from worker metrics)
-   - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client 
via `sendToClient()`
+
+- Retrieve tuple count (best-effort via runtime stats from worker metrics)
+- Send `PortMaterialized(portId, resultUri, tupleCount)` event to client via 
`sendToClient()`
+
 3. **Service layer** (`ExecutionCacheService`):
    - Registered callback via `client.registerCallback[PortMaterialized]` 
receives event
    - Calls `OperatorPortCacheService.upsertCachedOutput(...)`:
@@ -146,35 +170,43 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 **Architecture note**: Event-based communication follows existing controller 
pattern - handler emits events via `sendToClient()`, service layer registers 
callbacks to handle them. Clean separation: engine layer knows nothing about 
web/service layer.
 
 ### 4. Client-Side State Management
+
 **Location**: `ExecutionStatsService`, `ExecutionStateStore`
 
 **Stats propagation**:
+
 - `ExecutionStatsUpdate` events (from both cached and normal regions) update 
`ExecutionStatsStore`
 - Frontend receives operator metrics via WebSocket subscription
 - Cached regions appear as instantly completed operators with 0 workers and 0 
processing time
 
 **No structural changes needed**:
+
 - Existing protobuf messages (`OperatorStatistics`) handle cached regions 
naturally
 - No `from_cache` flag required (can infer from numWorkers=0 + instant 
completion)
 - Lifecycle management (eviction, cleanup) deferred to future work
 
 **Cache metadata UI (Implemented)**:
+
 - Left panel "Cache" tab listing workflow cache entries (physical op id, port 
id, tuple count, source execution id, updated_at, short subdag hash)
-- Highlight cache entries matched for the current execution fingerprint
+- Highlight cache entries usable by the current execution (fingerprint match)
+- Cache panel toggle to show only entries usable by the current execution
 - Output port labels show tuple counts, plus a second line with source 
execution id for cached outputs
 - Result URI hidden from the UI
 
 **Cache usage updates**:
-- `CacheUsageUpdateEvent` publishes matched cached outputs at submission time
+
+- `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
 - Frontend uses the event to drive cache entry highlighting and per-port cache 
labels
 - Cache usage snapshots are re-emitted on websocket connect to keep labels 
visible after refresh
 
 ### 5. Service & DAO Architecture
 
 #### OperatorPortCacheDao
+
 **Location**: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
 
 Low-level database access using Jooq:
+
 ```scala
 class OperatorPortCacheDao(sqlServer: SqlServer) {
 
@@ -201,9 +233,11 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
 ```
 
 #### OperatorPortCacheService
+
 **Location**: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
 
 High-level cache operations with business logic:
+
 ```scala
 class OperatorPortCacheService(dao: OperatorPortCacheDao) {
 
@@ -232,15 +266,18 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) 
{
 ```
 
 **Key responsibilities**:
+
 - Encapsulates fingerprint computation (calls `FingerprintUtil`)
 - Handles `GlobalPortIdentity` ↔ String serialization
 - Manages tuple count propagation (best-effort via runtime stats)
 - Provides workflow-level abstractions (batch lookup, invalidation)
 
 #### WorkflowExecutionsResource (REST API - Optional)
+
 **Location**: 
`/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala`
 
 HTTP endpoints for external access:
+
 - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache 
entries (result_uri omitted)
 - (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation
 
@@ -250,14 +287,15 @@ HTTP endpoints for external access:
 
 Phase 1.1 Service/DAO architecture is complete. Key components:
 
-| Component | Location | Purpose |
-|-----------|----------|---------|
-| **OperatorPortCacheDao** | 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | 
Low-level database access using Jooq. Methods: `get()`, `upsert()`, 
`listByWorkflow()`, `deleteByWorkflow()` |
-| **OperatorPortCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
 | High-level cache operations. Methods: `lookupCachedOutputs()`, 
`upsertCachedOutput()`, `invalidateWorkflowCache()` |
-| **ExecutionCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
 | Event listener that registers callback for `PortMaterialized` events and 
bridges to service layer |
-| **PortMaterialized Event** | 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
 | Client event emitted when output port completes with URI and tuple count |
+| Component                          | Location                                
                                                           | Purpose            
                                                                                
                     |
+| ---------------------------------- | 
--------------------------------------------------------------------------------------------------
 | 
-----------------------------------------------------------------------------------------------------------------------
 |
+| **OperatorPortCacheDao**     | 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`    
                 | Low-level database access using Jooq. Methods:`get()`, 
`upsert()`, `listByWorkflow()`, `deleteByWorkflow()`     |
+| **OperatorPortCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
             | High-level cache operations. Methods:`lookupCachedOutputs()`, 
`upsertCachedOutput()`, `invalidateWorkflowCache()` |
+| **ExecutionCacheService**    | 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
                | Event listener that registers callback for `PortMaterialized` 
events and bridges to service layer                     |
+| **PortMaterialized Event**   | 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
 | Client event emitted when output port completes with URI and tuple count     
                                           |
 
 **Integration Points**:
+
 - **WorkflowService**: Instantiates `cacheService` at workflow level (shared 
across executions)
 - **WorkflowExecutionService**:
   - Uses `cacheService.lookupCachedOutputs()` at submission time
@@ -267,6 +305,7 @@ Phase 1.1 Service/DAO architecture is complete. Key 
components:
 **Architecture**: Event-based communication follows existing patterns 
(ExecutionStatsUpdate, WorkerAssignmentUpdate). Engine layer has zero knowledge 
of web/service layer.
 
 ### 7. Testing Strategy
+
 - **Unit tests**: Fingerprint determinism, cost model logic, region 
classification
 - **Integration tests**: Cache upsert → DB verification, cache lookup → region 
marking
 - **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip 
behavior and result correctness
@@ -277,6 +316,7 @@ Phase 1.1 Service/DAO architecture is complete. Key 
components:
 ### Architecture Layers
 
 **Clean Architecture (Implemented)**:
+
 ```
 WorkflowExecutionService ──→ lookupCachedOutputs()
                             ↓
@@ -288,6 +328,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 ```
 
 **Event-based communication flow**:
+
 1. `PortCompletedHandler` emits `PortMaterialized` event via `sendToClient()`
 2. `ExecutionCacheService` registers callback via 
`client.registerCallback[PortMaterialized]`
 3. Callback invokes `OperatorPortCacheService.upsertCachedOutput()`
@@ -296,6 +337,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 **Clean layering**: Engine layer (PortCompletedHandler) has zero knowledge of 
web/service layer. Event-based pattern matches existing controller 
communication (ExecutionStatsUpdate, WorkerAssignmentUpdate, etc.).
 
 ### Completed Components
+
 - **Schema/migration**: `operator_port_cache` table added 
(`sql/updates/cache.sql`)
   - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), 
`fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, 
`updated_at`
   - Timestamp managed by database (`DEFAULT now()`)
@@ -324,15 +366,18 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached 
regions in `workflow-editor.component.ts`
   - Region visibility: shared state via `WorkflowActionService.showRegion` 
ensures correct visibility when regions are created during execution
 - **Cache metadata UI** (Phase 1.3 - Complete):
-  - `CacheUsageUpdateEvent` publishes matched cached outputs for the current 
execution
+  - `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
   - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple 
count, source execution id, updated_at, short subdag hash)
-  - Cache entries highlight when they match the current workflow fingerprint
+- Cache entries highlight when usable by the current execution (fingerprint 
match)
+- Cache panel can filter to only show entries usable by the current execution
   - Cached output ports show source execution id on a second label line
   - REST: `GET /executions/{wid}/cache` lists cache entries (result URI 
omitted)
   - Result URI omitted from UI payloads
 
 ### Architecture Integration
+
 The cache system integrates with three layers:
+
 1. **Execution Planning Layer**: Cache lookup at workflow submission, 
fingerprint computation
 2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, 
region classification
 3. **Runtime Layer**: Short-circuit execution for cached regions, state 
management, stats emission
@@ -340,9 +385,11 @@ The cache system integrates with three layers:
 ## Research Contributions
 
 ### 1. Cost Model & Pruning (Primary Contribution)
+
 **Goal**: Determine when caching is beneficial and what outputs to cache.
 
 **Components**:
+
 - **Operator cost estimation**: Predict execution cost using historical 
runtime statistics
 - **Cache I/O cost**: Model read/write cost for materialized outputs
 - **Pruning heuristics**:
@@ -356,9 +403,11 @@ The cache system integrates with three layers:
 **Data source**: `runtime_statistics` table already captures execution metrics 
(data/control processing time, tuple counts, worker counts per operator).
 
 ### 2. Result Lifecycle Management (Secondary Contribution)
+
 **Goal**: Maintain cache correctness and efficiency over time.
 
 **Components**:
+
 - **Cache eviction**: When storage is limited, prioritize keeping high 
recompute-cost/storage-cost ratio entries
 - **Invalidation**: Fingerprint-based (already handles operator param changes)
 - **Source change detection**: Track external data source versions (deferred - 
hard problem)
@@ -367,6 +416,7 @@ The cache system integrates with three layers:
 **Current status**: Assumed unlimited storage. Lifecycle management tabled for 
initial implementation.
 
 **Future considerations**:
+
 - Cost-aware eviction policies (LRU/LFU variants)
 - Source versioning for common sources (files, databases)
 - Cross-workflow cache sharing (global fingerprint registry)
@@ -374,18 +424,21 @@ The cache system integrates with three layers:
 ## Evaluation Plan
 
 ### Benchmark Workloads
+
 1. **Linear pipeline**: A → B → C → D (sequential operators)
 2. **Diamond pattern**: A → B, A → C, B+C → D (shared prefix)
 3. **Multiple branches**: Complex DAGs with shared subgraphs
 4. **UDF-heavy workflows**: Expensive computation (Python/R operators)
 
 ### Metrics
+
 - **Execution time**: Full execution vs cached re-execution
 - **Cache overhead**: Fingerprint computation + DB lookup latency
 - **Storage cost**: Cache entry sizes across workload types
 - **Hit rate**: Percentage of regions served from cache
 
 ### Experiments
+
 1. **Baseline**: Execute workflow without caching
 2. **Cold cache**: First execution (populate cache, measure overhead)
 3. **Warm cache**: Re-execution (all hits, measure speedup)
@@ -393,6 +446,7 @@ The cache system integrates with three layers:
 5. **Scalability**: Vary workflow size (10, 50, 100, 200 operators)
 
 ### Comparison Baselines
+
 - Spark RDD persistence (in-memory, no cross-execution)
 - Manual materialization (user-defined intermediate saves)
 - No caching (full re-execution)
@@ -402,70 +456,75 @@ The cache system integrates with three layers:
 ### Phase 1: Complete Prototype (Engineering)
 
 #### 1.1 Refactor to Service/DAO Architecture ✓ COMPLETE
-- [x] Create `OperatorPortCacheDao` with get/upsert/delete methods
+
+- [X] Create `OperatorPortCacheDao` with get/upsert/delete methods
   - Extracted Jooq code into dedicated DAO layer
   - Defined `OperatorPortCacheRecord` case class matching database schema
   - Location: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
-- [x] Create `OperatorPortCacheService` with high-level methods
+- [X] Create `OperatorPortCacheService` with high-level methods
   - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission
   - `upsertCachedOutput(...)`: cache write on port completion
   - `invalidateWorkflowCache(workflowId)`: manual invalidation
   - Encapsulates fingerprint computation and serialization
   - Location: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
-- [x] Create `ExecutionCacheService` for event handling
+- [X] Create `ExecutionCacheService` for event handling
   - Registers callback for `PortMaterialized` events
   - Bridges controller events to service layer
   - Location: 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
-- [x] Add `PortMaterialized` event type
+- [X] Add `PortMaterialized` event type
   - Location: 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
-- [x] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service
+- [X] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service
   - Uses `OperatorPortCacheService.lookupCachedOutputs()`
-- [x] Refactor `PortCompletedHandler` to emit events
+- [X] Refactor `PortCompletedHandler` to emit events
   - Emits `PortMaterialized` event via `sendToClient()` instead of direct 
service calls
-- [x] Instantiate services in `WorkflowService` and `WorkflowExecutionService`
+- [X] Instantiate services in `WorkflowService` and `WorkflowExecutionService`
   - `cacheService` created at workflow level
   - `executionCacheService` created per execution
 - [ ] Add unit tests for DAO operations
-- [x] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET 
/executions/{wid}/cache`)
+- [X] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET 
/executions/{wid}/cache`)
 
 #### 1.2 Frontend Cache Visualization ✓ COMPLETE
-- [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
+
+- [X] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
   - Location: `/amber/src/main/protobuf/.../controlreturns.proto`
-- [x] Add `CompletedFromCache` phase to `RegionExecutionCoordinator`
+- [X] Add `CompletedFromCache` phase to `RegionExecutionCoordinator`
   - Cached regions use this phase instead of `Completed`
   - Location: 
`/amber/src/main/scala/.../scheduling/RegionExecutionCoordinator.scala`
-- [x] Update backend state conversion functions in `Utils.scala`
+- [X] Update backend state conversion functions in `Utils.scala`
   - `aggregatedStateToString`: `COMPLETED_FROM_CACHE` → `"CompletedFromCache"`
   - `stringToAggregatedState`: `"completedfromcache"` → `COMPLETED_FROM_CACHE`
   - `maptoStatusCode`: `COMPLETED_FROM_CACHE` → `6`
-- [x] Update `ExecutionUtils.aggregateStates()` to handle 
`COMPLETED_FROM_CACHE`
+- [X] Update `ExecutionUtils.aggregateStates()` to handle 
`COMPLETED_FROM_CACHE`
   - Treated as terminal state alongside `COMPLETED` and `TERMINATED`
-- [x] Add `CompletedFromCache` to frontend `OperatorState` enum
+- [X] Add `CompletedFromCache` to frontend `OperatorState` enum
   - Location: `/frontend/src/app/workspace/types/execute-workflow.interface.ts`
-- [x] Add blue fill color for cached operators in `joint-ui.service.ts`
+- [X] Add blue fill color for cached operators in `joint-ui.service.ts`
   - Color: `#1890ff` (Ant Design blue)
-- [x] Add blue fill for cached regions in `workflow-editor.component.ts`
+- [X] Add blue fill for cached regions in `workflow-editor.component.ts`
   - Color: `rgba(24,144,255,0.3)` (translucent blue)
-- [x] Show `-` for cached input counts and cached output ports without 
materialization
-- [x] Replace cached worker count label with `from cache`
-- [x] Fix region visibility with shared state via 
`WorkflowActionService.showRegion`
+- [X] Show `-` for cached input counts and cached output ports without 
materialization
+- [X] Replace cached worker count label with `from cache`
+- [X] Fix region visibility with shared state via 
`WorkflowActionService.showRegion`
   - Ensures regions show correctly when user toggles visibility before 
execution
 
 #### 1.3 Cache Metadata UI ✓ COMPLETE
-- [x] Add left panel "Cache" tab listing workflow cache entries (physical op 
id, port id, tuple count, source execution id, updated_at, short subdag hash)
-- [x] Highlight cache entries matched for the current execution fingerprint
-- [x] Show per-output-port sourceExecutionId on a second output port label line
-- [x] Re-emit cache usage snapshots on websocket connect to refresh cache 
labels after reload
-- [x] Keep result URI hidden in the UI
+
+- [X] Add left panel "Cache" tab listing workflow cache entries (physical op 
id, port id, tuple count, source execution id, updated_at, short subdag hash)
+- [x] Highlight cache entries usable by the current execution (fingerprint 
match)
+- [X] Show per-output-port sourceExecutionId on a second output port label line
+- [X] Re-emit cache usage snapshots on websocket connect to refresh cache 
labels after reload
+- [X] Keep result URI hidden in the UI
 
 #### 1.4 Testing & Validation
-- [ ] Verify downstream cached URI consumption across all operator types
+
+- [X] Verify downstream cached URI consumption across all operator types
 - [ ] Add integration tests: cache upsert → DB verification
 - [ ] Add E2E tests: run → cache → rerun → verify skip
-- [ ] Clean up state hierarchy for cached regions (confirm shallow hierarchy)
-- [ ] Verify tuple count accuracy in cache metadata
+- [X] Clean up state hierarchy for cached regions (confirm shallow hierarchy)
+- [X] Verify tuple count accuracy in cache metadata
 
 ### Phase 2: Cost Model Development (Research)
+
 - [ ] Analyze historical runtime_statistics data for cost patterns
 - [ ] Implement cost estimation for operator execution (based on historical 
stats)
 - [ ] Implement cache I/O cost model (storage backend dependent)
@@ -474,18 +533,21 @@ The cache system integrates with three layers:
 - [ ] Evaluate cache decisions: cost-based vs always-cache vs never-cache
 
 ### Phase 3: Lifecycle Management (Research - Optional)
+
 - [ ] Design cost-aware eviction policy
 - [ ] Implement storage quota management
 - [ ] Add garbage collection for deleted workflows
 - [ ] Evaluate eviction policy effectiveness
 
 ### Phase 4: Evaluation & Publication
+
 - [ ] Design and implement benchmark workloads
 - [ ] Run experiments: measure speedup, overhead, hit rates
 - [ ] Scalability analysis (varying workflow sizes)
 - [ ] Write research paper (target: SIGMOD/VLDB or workshop)
 
 ## Open Research Questions
+
 1. **Cost model accuracy**: How well can historical stats predict future 
execution costs? Does operator heterogeneity require per-operator-type models?
 2. **Reuse patterns**: Can we predict which outputs will be reused based on 
user interaction patterns?
 3. **Source change detection**: For external data sources (files, databases), 
how to efficiently detect changes without explicit versioning?
@@ -493,18 +555,22 @@ The cache system integrates with three layers:
 5. **Incremental maintenance**: When source data changes slightly, can we 
update cache incrementally vs full invalidation?
 
 ## Publication Strategy
+
 **Primary target**: Cost-aware caching framework for incremental workflow 
execution
 
 **Minimum viable contributions** (Workshop/Demo paper):
+
 - Pasta extension for cache-aware scheduling
 - Fingerprint-based correctness
 - Working prototype with evaluation
 
 **Full paper contributions** (SIGMOD/VLDB):
+
 - Above + cost model with formal problem definition
 - Cost-aware eviction (lifecycle management)
 - Comprehensive evaluation with multiple baselines
 
 **Alternative positioning**:
+
 - Demo paper: Focus on Texera integration and user experience
 - Industrial track: Deployment story with real user workloads
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
index 39783defb3..ba1cddbe7a 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
@@ -27,10 +27,19 @@
       (click)="refresh()">
       Refresh
     </button>
+    <label class="cache-panel__toggle">
+      <nz-switch
+        nzSize="small"
+        [(ngModel)]="showUsableOnly"
+        (ngModelChange)="updateVisibleEntries()"></nz-switch>
+      <span>Usable by this execution</span>
+    </label>
     <span
       class="cache-panel__count"
       *ngIf="cacheEntries.length">
-      {{ cacheEntries.length }} entries
+      {{ visibleEntries.length }}
+      <ng-container *ngIf="showUsableOnly">of {{ cacheEntries.length 
}}</ng-container>
+      entries
     </span>
   </div>
 
@@ -40,8 +49,8 @@
     [nzFrontPagination]="false"
     nzTableLayout="auto"
     [nzLoading]="loading"
-    *ngIf="loading || cacheEntries.length"
-    [nzData]="cacheEntries">
+    *ngIf="loading || visibleEntries.length"
+    [nzData]="visibleEntries">
     <thead>
       <tr>
         <th>Port</th>
@@ -50,8 +59,8 @@
     </thead>
     <tbody>
       <tr
-        *ngFor="let entry of cacheEntries"
-        [class.cache-match]="isMatched(entry)">
+        *ngFor="let entry of visibleEntries"
+        [class.cache-usable]="isUsableForExecution(entry)">
         <td>
           <div class="cache-port">
             <div class="cache-port__op">{{ entry.logicalOpId }}</div>
@@ -64,9 +73,9 @@
         <td>
           <div class="cache-meta">
             <nz-tag
-              *ngIf="isMatched(entry)"
+              *ngIf="isUsableForExecution(entry)"
               nzColor="blue">
-              Matched
+              Usable by this execution
             </nz-tag>
             <div>source execution: {{ 
formatSourceExecutionId(entry.sourceExecutionId) }}</div>
             <div>tuples: {{ formatTupleCount(entry.tupleCount) }}</div>
@@ -79,7 +88,7 @@
   </nz-table>
 
   <nz-empty
-    *ngIf="!loading && !cacheEntries.length"
-    nzNotFoundContent="No cache entries">
+    *ngIf="!loading && !visibleEntries.length"
+    [nzNotFoundContent]="showUsableOnly ? 'No usable cache entries' : 'No 
cache entries'">
   </nz-empty>
 </div>
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
index 402fac73b2..d6e7a58ffb 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
@@ -2,15 +2,26 @@
   display: flex;
   flex-direction: column;
   gap: 8px;
+  padding: 8px;
 }
 
 .cache-panel__actions {
   display: flex;
   align-items: center;
   gap: 8px;
+  flex-wrap: wrap;
+}
+
+.cache-panel__toggle {
+  display: inline-flex;
+  align-items: center;
+  gap: 6px;
+  color: #666;
+  font-size: 12px;
 }
 
 .cache-panel__count {
+  margin-left: auto;
   color: #666;
   font-size: 12px;
 }
@@ -32,6 +43,6 @@
   font-size: 12px;
 }
 
-tr.cache-match {
+tr.cache-usable {
   background-color: rgba(24, 144, 255, 0.08);
 }
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index 33ca2e7463..4ed7afff98 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -36,6 +36,10 @@ import { CacheUsageService } from 
"../../../service/workflow-status/cache-usage.
 })
 export class CachePanelComponent implements OnInit {
   public cacheEntries: WorkflowCacheEntry[] = [];
+  /** Entries shown in the table after applying the usable-only toggle. */
+  public visibleEntries: WorkflowCacheEntry[] = [];
+  /** When true, only cache entries usable by the current execution are shown. 
*/
+  public showUsableOnly = false;
   public loading = false;
   private workflowId?: number;
   private usageKeys = new Set<string>();
@@ -60,6 +64,7 @@ export class CachePanelComponent implements OnInit {
         this.usageKeys = new Set(
           entries.map(entry => 
this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash))
         );
+        this.updateVisibleEntries();
       });
   }
 
@@ -81,16 +86,27 @@ export class CachePanelComponent implements OnInit {
       )
       .subscribe(entries => {
         this.cacheEntries = entries;
+        this.updateVisibleEntries();
       });
   }
 
   /**
-   * Returns true when a cache entry matches the current execution fingerprint.
+   * Returns true when a cache entry is usable by the current execution 
(fingerprint match),
+   * regardless of whether the scheduler chooses to reuse it.
    */
-  public isMatched(entry: WorkflowCacheEntry): boolean {
+  public isUsableForExecution(entry: WorkflowCacheEntry): boolean {
     return 
this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, 
entry.subdagHash));
   }
 
+  /**
+   * Updates the entries shown in the table based on the usable-only toggle.
+   */
+  public updateVisibleEntries(): void {
+    this.visibleEntries = this.showUsableOnly
+      ? this.cacheEntries.filter(entry => this.isUsableForExecution(entry))
+      : this.cacheEntries;
+  }
+
   /**
    * Formats tuple counts for display.
    */

Reply via email to