This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 9a037f52a010ff0ed90d9184624ed1e940a858c0 Author: Xiaozhen Liu <[email protected]> AuthorDate: Wed Jan 14 21:49:46 2026 -0800 feat(cache): minor improvements to cache display panel. --- docs/operator-port-cache.md | 142 +++++++++++++++------ .../cache-panel/cache-panel.component.html | 27 ++-- .../cache-panel/cache-panel.component.scss | 13 +- .../cache-panel/cache-panel.component.ts | 20 ++- 4 files changed, 152 insertions(+), 50 deletions(-) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 7f627755e5..0d1348c740 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -12,6 +12,7 @@ Enable **incremental workflow execution** through cost-aware caching of operator **Key principle**: The physical plan remains unchanged; cache-or-recompute decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream sub-DAG. **Research goals**: + - Extend Pasta scheduler with cost-based caching decisions - Develop cost models and pruning heuristics for what/when to cache - Evaluate speedup on iterative data science workflows @@ -20,33 +21,43 @@ Enable **incremental workflow execution** through cost-aware caching of operator ## Key Design Decisions ### 1. Physical Plan Immutability + The physical plan is never modified to accommodate caching. Cache decisions are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This preserves the integrity of the Pasta scheduling framework. ### 2. Fingerprint-Based Correctness + Cache keys use SHA-256 hashes of canonical upstream sub-DAG representations (operators + schemas + edges + init info). This ensures: + - Deterministic matching: same computation = same fingerprint - Automatic invalidation: any upstream change = different fingerprint = cache miss - No explicit dependency tracking needed ### 3. Region Homogeneity Constraint + A region is either fully cached (ToSkip) or fully executed (ToExecute) — no partial execution within a region. This simplifies: + - Scheduler logic (binary cached flag per region) - Runtime state management (consistent execution mode) - Cost model (compare full region costs) ### 4. Shallow State Hierarchy for Cached Regions + ToSkip regions create lightweight state structures (Workflow → Region → Operator/Link) and store cached metrics at the operator level. No Worker/Channel states are created, so `numWorkers=0` and no worker assignments are emitted. ### 5. Stats Emission via Direct Client Updates + Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). This reuses existing stats infrastructure without special-casing the frontend. ### 6. Explicit Cache State in Metrics + Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in `WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to users and distinguishes cache-hit completion from normal execution completion. ### 7. Deferred Lifecycle Management + V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research topics for future work, not implementation blockers. ## Data Model + - Table `operator_port_cache` (PK `(workflow_id, global_port_id, subdag_hash)`): - `fingerprint_json`: canonical JSON of the upstream sub‑DAG. - `subdag_hash`: SHA-256 of `fingerprint_json`. @@ -57,6 +68,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research - Status: schema + migration added (`sql/updates/cache.sql`). ## Fingerprint + - Utility: `FingerprintUtil.computeSubdagFingerprint(physicalPlan, globalPortId) -> (fingerprintJson, subdagHash)`. - Canonical payload (sorted): - Target port ID. @@ -67,6 +79,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research ## End-to-End Workflow ### 1. Cache Lookup (Submission Time) + **Location**: `WorkflowExecutionService` → `OperatorPortCacheService` → `OperatorPortCacheDao` - Compile logical workflow to `PhysicalPlan` @@ -79,36 +92,44 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research - Store in `WorkflowSettings.cachedOutputs` and pass to scheduler ### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator) + **Location**: `CostBasedScheduleGenerator` **Inputs**: + - Physical plan (immutable) - `cachedOutputs` (Δ): map of cache hits from step 1 - Visible ports (☐): ports that must be materialized for user visibility **Region Classification**: + - **Homogeneity constraint**: A region is either fully cached (ToSkip) or fully executed (ToExecute) — no mixing within a region - **ToExecute regions**: Contain visible ports without cache hits OR depend on uncached intermediate materializations - **ToSkip regions**: All required output ports have cache hits AND no visible ports need fresh computation **Cost Model** (currently simple, needs refinement for research): + - Cached regions: cost = 0 - Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + materialization read/write costs - Cache read/write: small fixed costs (0.5 per port) - **Future**: Historical stats-based estimation from `runtime_statistics` table **Output**: + - Schedule with regions marked `cached=true/false` - Port configs updated with cached URIs for ToSkip regions - Cached tuple counts reused in port metadata ### 3. Runtime Execution + **Location**: `RegionExecutionCoordinator`, `PortCompletedHandler` #### ToSkip Regions (Cached) + Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached` flag **Execution path**: + 1. **Skip operator execution**: Call `completeCachedRegion()` immediately 2. **State hierarchy** (shallow): - Create: Workflow → Region → Operator/Link states @@ -126,13 +147,16 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached 7. **Set phase to Completed**: Region lifecycle completes immediately #### ToExecute Regions (Normal Execution) + **Location**: `PortCompletedHandler` → `PortMaterialized event` → `ExecutionCacheService` → `OperatorPortCacheService` → `OperatorPortCacheDao` 1. **Execute operators**: Normal execution path via worker actors 2. **On output port completion** (`PortCompletedHandler`): - Retrieve result URI from `WorkflowExecutionsResource.getResultUriByPhysicalPortId` - - Retrieve tuple count (best-effort via runtime stats from worker metrics) - - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client via `sendToClient()` + +- Retrieve tuple count (best-effort via runtime stats from worker metrics) +- Send `PortMaterialized(portId, resultUri, tupleCount)` event to client via `sendToClient()` + 3. **Service layer** (`ExecutionCacheService`): - Registered callback via `client.registerCallback[PortMaterialized]` receives event - Calls `OperatorPortCacheService.upsertCachedOutput(...)`: @@ -146,35 +170,43 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached **Architecture note**: Event-based communication follows existing controller pattern - handler emits events via `sendToClient()`, service layer registers callbacks to handle them. Clean separation: engine layer knows nothing about web/service layer. ### 4. Client-Side State Management + **Location**: `ExecutionStatsService`, `ExecutionStateStore` **Stats propagation**: + - `ExecutionStatsUpdate` events (from both cached and normal regions) update `ExecutionStatsStore` - Frontend receives operator metrics via WebSocket subscription - Cached regions appear as instantly completed operators with 0 workers and 0 processing time **No structural changes needed**: + - Existing protobuf messages (`OperatorStatistics`) handle cached regions naturally - No `from_cache` flag required (can infer from numWorkers=0 + instant completion) - Lifecycle management (eviction, cleanup) deferred to future work **Cache metadata UI (Implemented)**: + - Left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) -- Highlight cache entries matched for the current execution fingerprint +- Highlight cache entries usable by the current execution (fingerprint match) +- Cache panel toggle to show only entries usable by the current execution - Output port labels show tuple counts, plus a second line with source execution id for cached outputs - Result URI hidden from the UI **Cache usage updates**: -- `CacheUsageUpdateEvent` publishes matched cached outputs at submission time + +- `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) - Frontend uses the event to drive cache entry highlighting and per-port cache labels - Cache usage snapshots are re-emitted on websocket connect to keep labels visible after refresh ### 5. Service & DAO Architecture #### OperatorPortCacheDao + **Location**: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` Low-level database access using Jooq: + ```scala class OperatorPortCacheDao(sqlServer: SqlServer) { @@ -201,9 +233,11 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { ``` #### OperatorPortCacheService + **Location**: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` High-level cache operations with business logic: + ```scala class OperatorPortCacheService(dao: OperatorPortCacheDao) { @@ -232,15 +266,18 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { ``` **Key responsibilities**: + - Encapsulates fingerprint computation (calls `FingerprintUtil`) - Handles `GlobalPortIdentity` ↔ String serialization - Manages tuple count propagation (best-effort via runtime stats) - Provides workflow-level abstractions (batch lookup, invalidation) #### WorkflowExecutionsResource (REST API - Optional) + **Location**: `/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala` HTTP endpoints for external access: + - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache entries (result_uri omitted) - (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation @@ -250,14 +287,15 @@ HTTP endpoints for external access: Phase 1.1 Service/DAO architecture is complete. Key components: -| Component | Location | Purpose | -|-----------|----------|---------| -| **OperatorPortCacheDao** | `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | Low-level database access using Jooq. Methods: `get()`, `upsert()`, `listByWorkflow()`, `deleteByWorkflow()` | -| **OperatorPortCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` | High-level cache operations. Methods: `lookupCachedOutputs()`, `upsertCachedOutput()`, `invalidateWorkflowCache()` | -| **ExecutionCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` | Event listener that registers callback for `PortMaterialized` events and bridges to service layer | -| **PortMaterialized Event** | `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` | Client event emitted when output port completes with URI and tuple count | +| Component | Location | Purpose | +| ---------------------------------- | -------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | +| **OperatorPortCacheDao** | `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | Low-level database access using Jooq. Methods:`get()`, `upsert()`, `listByWorkflow()`, `deleteByWorkflow()` | +| **OperatorPortCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` | High-level cache operations. Methods:`lookupCachedOutputs()`, `upsertCachedOutput()`, `invalidateWorkflowCache()` | +| **ExecutionCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` | Event listener that registers callback for `PortMaterialized` events and bridges to service layer | +| **PortMaterialized Event** | `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` | Client event emitted when output port completes with URI and tuple count | **Integration Points**: + - **WorkflowService**: Instantiates `cacheService` at workflow level (shared across executions) - **WorkflowExecutionService**: - Uses `cacheService.lookupCachedOutputs()` at submission time @@ -267,6 +305,7 @@ Phase 1.1 Service/DAO architecture is complete. Key components: **Architecture**: Event-based communication follows existing patterns (ExecutionStatsUpdate, WorkerAssignmentUpdate). Engine layer has zero knowledge of web/service layer. ### 7. Testing Strategy + - **Unit tests**: Fingerprint determinism, cost model logic, region classification - **Integration tests**: Cache upsert → DB verification, cache lookup → region marking - **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip behavior and result correctness @@ -277,6 +316,7 @@ Phase 1.1 Service/DAO architecture is complete. Key components: ### Architecture Layers **Clean Architecture (Implemented)**: + ``` WorkflowExecutionService ──→ lookupCachedOutputs() ↓ @@ -288,6 +328,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache ``` **Event-based communication flow**: + 1. `PortCompletedHandler` emits `PortMaterialized` event via `sendToClient()` 2. `ExecutionCacheService` registers callback via `client.registerCallback[PortMaterialized]` 3. Callback invokes `OperatorPortCacheService.upsertCachedOutput()` @@ -296,6 +337,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache **Clean layering**: Engine layer (PortCompletedHandler) has zero knowledge of web/service layer. Event-based pattern matches existing controller communication (ExecutionStatsUpdate, WorkerAssignmentUpdate, etc.). ### Completed Components + - **Schema/migration**: `operator_port_cache` table added (`sql/updates/cache.sql`) - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), `fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, `updated_at` - Timestamp managed by database (`DEFAULT now()`) @@ -324,15 +366,18 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached regions in `workflow-editor.component.ts` - Region visibility: shared state via `WorkflowActionService.showRegion` ensures correct visibility when regions are created during execution - **Cache metadata UI** (Phase 1.3 - Complete): - - `CacheUsageUpdateEvent` publishes matched cached outputs for the current execution + - `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) - - Cache entries highlight when they match the current workflow fingerprint +- Cache entries highlight when usable by the current execution (fingerprint match) +- Cache panel can filter to only show entries usable by the current execution - Cached output ports show source execution id on a second label line - REST: `GET /executions/{wid}/cache` lists cache entries (result URI omitted) - Result URI omitted from UI payloads ### Architecture Integration + The cache system integrates with three layers: + 1. **Execution Planning Layer**: Cache lookup at workflow submission, fingerprint computation 2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, region classification 3. **Runtime Layer**: Short-circuit execution for cached regions, state management, stats emission @@ -340,9 +385,11 @@ The cache system integrates with three layers: ## Research Contributions ### 1. Cost Model & Pruning (Primary Contribution) + **Goal**: Determine when caching is beneficial and what outputs to cache. **Components**: + - **Operator cost estimation**: Predict execution cost using historical runtime statistics - **Cache I/O cost**: Model read/write cost for materialized outputs - **Pruning heuristics**: @@ -356,9 +403,11 @@ The cache system integrates with three layers: **Data source**: `runtime_statistics` table already captures execution metrics (data/control processing time, tuple counts, worker counts per operator). ### 2. Result Lifecycle Management (Secondary Contribution) + **Goal**: Maintain cache correctness and efficiency over time. **Components**: + - **Cache eviction**: When storage is limited, prioritize keeping high recompute-cost/storage-cost ratio entries - **Invalidation**: Fingerprint-based (already handles operator param changes) - **Source change detection**: Track external data source versions (deferred - hard problem) @@ -367,6 +416,7 @@ The cache system integrates with three layers: **Current status**: Assumed unlimited storage. Lifecycle management tabled for initial implementation. **Future considerations**: + - Cost-aware eviction policies (LRU/LFU variants) - Source versioning for common sources (files, databases) - Cross-workflow cache sharing (global fingerprint registry) @@ -374,18 +424,21 @@ The cache system integrates with three layers: ## Evaluation Plan ### Benchmark Workloads + 1. **Linear pipeline**: A → B → C → D (sequential operators) 2. **Diamond pattern**: A → B, A → C, B+C → D (shared prefix) 3. **Multiple branches**: Complex DAGs with shared subgraphs 4. **UDF-heavy workflows**: Expensive computation (Python/R operators) ### Metrics + - **Execution time**: Full execution vs cached re-execution - **Cache overhead**: Fingerprint computation + DB lookup latency - **Storage cost**: Cache entry sizes across workload types - **Hit rate**: Percentage of regions served from cache ### Experiments + 1. **Baseline**: Execute workflow without caching 2. **Cold cache**: First execution (populate cache, measure overhead) 3. **Warm cache**: Re-execution (all hits, measure speedup) @@ -393,6 +446,7 @@ The cache system integrates with three layers: 5. **Scalability**: Vary workflow size (10, 50, 100, 200 operators) ### Comparison Baselines + - Spark RDD persistence (in-memory, no cross-execution) - Manual materialization (user-defined intermediate saves) - No caching (full re-execution) @@ -402,70 +456,75 @@ The cache system integrates with three layers: ### Phase 1: Complete Prototype (Engineering) #### 1.1 Refactor to Service/DAO Architecture ✓ COMPLETE -- [x] Create `OperatorPortCacheDao` with get/upsert/delete methods + +- [X] Create `OperatorPortCacheDao` with get/upsert/delete methods - Extracted Jooq code into dedicated DAO layer - Defined `OperatorPortCacheRecord` case class matching database schema - Location: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` -- [x] Create `OperatorPortCacheService` with high-level methods +- [X] Create `OperatorPortCacheService` with high-level methods - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission - `upsertCachedOutput(...)`: cache write on port completion - `invalidateWorkflowCache(workflowId)`: manual invalidation - Encapsulates fingerprint computation and serialization - Location: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` -- [x] Create `ExecutionCacheService` for event handling +- [X] Create `ExecutionCacheService` for event handling - Registers callback for `PortMaterialized` events - Bridges controller events to service layer - Location: `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` -- [x] Add `PortMaterialized` event type +- [X] Add `PortMaterialized` event type - Location: `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` -- [x] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service +- [X] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service - Uses `OperatorPortCacheService.lookupCachedOutputs()` -- [x] Refactor `PortCompletedHandler` to emit events +- [X] Refactor `PortCompletedHandler` to emit events - Emits `PortMaterialized` event via `sendToClient()` instead of direct service calls -- [x] Instantiate services in `WorkflowService` and `WorkflowExecutionService` +- [X] Instantiate services in `WorkflowService` and `WorkflowExecutionService` - `cacheService` created at workflow level - `executionCacheService` created per execution - [ ] Add unit tests for DAO operations -- [x] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET /executions/{wid}/cache`) +- [X] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET /executions/{wid}/cache`) #### 1.2 Frontend Cache Visualization ✓ COMPLETE -- [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum + +- [X] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum - Location: `/amber/src/main/protobuf/.../controlreturns.proto` -- [x] Add `CompletedFromCache` phase to `RegionExecutionCoordinator` +- [X] Add `CompletedFromCache` phase to `RegionExecutionCoordinator` - Cached regions use this phase instead of `Completed` - Location: `/amber/src/main/scala/.../scheduling/RegionExecutionCoordinator.scala` -- [x] Update backend state conversion functions in `Utils.scala` +- [X] Update backend state conversion functions in `Utils.scala` - `aggregatedStateToString`: `COMPLETED_FROM_CACHE` → `"CompletedFromCache"` - `stringToAggregatedState`: `"completedfromcache"` → `COMPLETED_FROM_CACHE` - `maptoStatusCode`: `COMPLETED_FROM_CACHE` → `6` -- [x] Update `ExecutionUtils.aggregateStates()` to handle `COMPLETED_FROM_CACHE` +- [X] Update `ExecutionUtils.aggregateStates()` to handle `COMPLETED_FROM_CACHE` - Treated as terminal state alongside `COMPLETED` and `TERMINATED` -- [x] Add `CompletedFromCache` to frontend `OperatorState` enum +- [X] Add `CompletedFromCache` to frontend `OperatorState` enum - Location: `/frontend/src/app/workspace/types/execute-workflow.interface.ts` -- [x] Add blue fill color for cached operators in `joint-ui.service.ts` +- [X] Add blue fill color for cached operators in `joint-ui.service.ts` - Color: `#1890ff` (Ant Design blue) -- [x] Add blue fill for cached regions in `workflow-editor.component.ts` +- [X] Add blue fill for cached regions in `workflow-editor.component.ts` - Color: `rgba(24,144,255,0.3)` (translucent blue) -- [x] Show `-` for cached input counts and cached output ports without materialization -- [x] Replace cached worker count label with `from cache` -- [x] Fix region visibility with shared state via `WorkflowActionService.showRegion` +- [X] Show `-` for cached input counts and cached output ports without materialization +- [X] Replace cached worker count label with `from cache` +- [X] Fix region visibility with shared state via `WorkflowActionService.showRegion` - Ensures regions show correctly when user toggles visibility before execution #### 1.3 Cache Metadata UI ✓ COMPLETE -- [x] Add left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) -- [x] Highlight cache entries matched for the current execution fingerprint -- [x] Show per-output-port sourceExecutionId on a second output port label line -- [x] Re-emit cache usage snapshots on websocket connect to refresh cache labels after reload -- [x] Keep result URI hidden in the UI + +- [X] Add left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) +- [x] Highlight cache entries usable by the current execution (fingerprint match) +- [X] Show per-output-port sourceExecutionId on a second output port label line +- [X] Re-emit cache usage snapshots on websocket connect to refresh cache labels after reload +- [X] Keep result URI hidden in the UI #### 1.4 Testing & Validation -- [ ] Verify downstream cached URI consumption across all operator types + +- [X] Verify downstream cached URI consumption across all operator types - [ ] Add integration tests: cache upsert → DB verification - [ ] Add E2E tests: run → cache → rerun → verify skip -- [ ] Clean up state hierarchy for cached regions (confirm shallow hierarchy) -- [ ] Verify tuple count accuracy in cache metadata +- [X] Clean up state hierarchy for cached regions (confirm shallow hierarchy) +- [X] Verify tuple count accuracy in cache metadata ### Phase 2: Cost Model Development (Research) + - [ ] Analyze historical runtime_statistics data for cost patterns - [ ] Implement cost estimation for operator execution (based on historical stats) - [ ] Implement cache I/O cost model (storage backend dependent) @@ -474,18 +533,21 @@ The cache system integrates with three layers: - [ ] Evaluate cache decisions: cost-based vs always-cache vs never-cache ### Phase 3: Lifecycle Management (Research - Optional) + - [ ] Design cost-aware eviction policy - [ ] Implement storage quota management - [ ] Add garbage collection for deleted workflows - [ ] Evaluate eviction policy effectiveness ### Phase 4: Evaluation & Publication + - [ ] Design and implement benchmark workloads - [ ] Run experiments: measure speedup, overhead, hit rates - [ ] Scalability analysis (varying workflow sizes) - [ ] Write research paper (target: SIGMOD/VLDB or workshop) ## Open Research Questions + 1. **Cost model accuracy**: How well can historical stats predict future execution costs? Does operator heterogeneity require per-operator-type models? 2. **Reuse patterns**: Can we predict which outputs will be reused based on user interaction patterns? 3. **Source change detection**: For external data sources (files, databases), how to efficiently detect changes without explicit versioning? @@ -493,18 +555,22 @@ The cache system integrates with three layers: 5. **Incremental maintenance**: When source data changes slightly, can we update cache incrementally vs full invalidation? ## Publication Strategy + **Primary target**: Cost-aware caching framework for incremental workflow execution **Minimum viable contributions** (Workshop/Demo paper): + - Pasta extension for cache-aware scheduling - Fingerprint-based correctness - Working prototype with evaluation **Full paper contributions** (SIGMOD/VLDB): + - Above + cost model with formal problem definition - Cost-aware eviction (lifecycle management) - Comprehensive evaluation with multiple baselines **Alternative positioning**: + - Demo paper: Focus on Texera integration and user experience - Industrial track: Deployment story with real user workloads diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html index 39783defb3..ba1cddbe7a 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html @@ -27,10 +27,19 @@ (click)="refresh()"> Refresh </button> + <label class="cache-panel__toggle"> + <nz-switch + nzSize="small" + [(ngModel)]="showUsableOnly" + (ngModelChange)="updateVisibleEntries()"></nz-switch> + <span>Usable by this execution</span> + </label> <span class="cache-panel__count" *ngIf="cacheEntries.length"> - {{ cacheEntries.length }} entries + {{ visibleEntries.length }} + <ng-container *ngIf="showUsableOnly">of {{ cacheEntries.length }}</ng-container> + entries </span> </div> @@ -40,8 +49,8 @@ [nzFrontPagination]="false" nzTableLayout="auto" [nzLoading]="loading" - *ngIf="loading || cacheEntries.length" - [nzData]="cacheEntries"> + *ngIf="loading || visibleEntries.length" + [nzData]="visibleEntries"> <thead> <tr> <th>Port</th> @@ -50,8 +59,8 @@ </thead> <tbody> <tr - *ngFor="let entry of cacheEntries" - [class.cache-match]="isMatched(entry)"> + *ngFor="let entry of visibleEntries" + [class.cache-usable]="isUsableForExecution(entry)"> <td> <div class="cache-port"> <div class="cache-port__op">{{ entry.logicalOpId }}</div> @@ -64,9 +73,9 @@ <td> <div class="cache-meta"> <nz-tag - *ngIf="isMatched(entry)" + *ngIf="isUsableForExecution(entry)" nzColor="blue"> - Matched + Usable by this execution </nz-tag> <div>source execution: {{ formatSourceExecutionId(entry.sourceExecutionId) }}</div> <div>tuples: {{ formatTupleCount(entry.tupleCount) }}</div> @@ -79,7 +88,7 @@ </nz-table> <nz-empty - *ngIf="!loading && !cacheEntries.length" - nzNotFoundContent="No cache entries"> + *ngIf="!loading && !visibleEntries.length" + [nzNotFoundContent]="showUsableOnly ? 'No usable cache entries' : 'No cache entries'"> </nz-empty> </div> diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss index 402fac73b2..d6e7a58ffb 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss @@ -2,15 +2,26 @@ display: flex; flex-direction: column; gap: 8px; + padding: 8px; } .cache-panel__actions { display: flex; align-items: center; gap: 8px; + flex-wrap: wrap; +} + +.cache-panel__toggle { + display: inline-flex; + align-items: center; + gap: 6px; + color: #666; + font-size: 12px; } .cache-panel__count { + margin-left: auto; color: #666; font-size: 12px; } @@ -32,6 +43,6 @@ font-size: 12px; } -tr.cache-match { +tr.cache-usable { background-color: rgba(24, 144, 255, 0.08); } diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index 33ca2e7463..4ed7afff98 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -36,6 +36,10 @@ import { CacheUsageService } from "../../../service/workflow-status/cache-usage. }) export class CachePanelComponent implements OnInit { public cacheEntries: WorkflowCacheEntry[] = []; + /** Entries shown in the table after applying the usable-only toggle. */ + public visibleEntries: WorkflowCacheEntry[] = []; + /** When true, only cache entries usable by the current execution are shown. */ + public showUsableOnly = false; public loading = false; private workflowId?: number; private usageKeys = new Set<string>(); @@ -60,6 +64,7 @@ export class CachePanelComponent implements OnInit { this.usageKeys = new Set( entries.map(entry => this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)) ); + this.updateVisibleEntries(); }); } @@ -81,16 +86,27 @@ export class CachePanelComponent implements OnInit { ) .subscribe(entries => { this.cacheEntries = entries; + this.updateVisibleEntries(); }); } /** - * Returns true when a cache entry matches the current execution fingerprint. + * Returns true when a cache entry is usable by the current execution (fingerprint match), + * regardless of whether the scheduler chooses to reuse it. */ - public isMatched(entry: WorkflowCacheEntry): boolean { + public isUsableForExecution(entry: WorkflowCacheEntry): boolean { return this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)); } + /** + * Updates the entries shown in the table based on the usable-only toggle. + */ + public updateVisibleEntries(): void { + this.visibleEntries = this.showUsableOnly + ? this.cacheEntries.filter(entry => this.isUsableForExecution(entry)) + : this.cacheEntries; + } + /** * Formats tuple counts for display. */
