This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 16dfe97b7adc7f80b41e0eb0246f75976ba7a790 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 12 15:42:02 2026 -0800 feat(cache): update design doc. --- docs/operator-port-cache.md | 670 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 630 insertions(+), 40 deletions(-) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 5fc000c583..cc1d760437 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -1,7 +1,50 @@ # Operator Port Result Cache Design ## Objective -Reuse previously materialized operator output ports without modifying the physical plan. The physical plan remains immutable; reuse decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata keyed by a deterministic fingerprint of the upstream sub‑DAG. + +Enable **incremental workflow execution** through cost-aware caching of operator output ports. When users iteratively refine workflows (modify parameters, add/remove operators), the system should: + +1. **Reuse cached results** where upstream computation is unchanged +2. **Make cost-aware decisions** comparing cache read cost vs recomputation cost +3. **Maintain correctness** via deterministic fingerprinting of upstream sub-DAGs +4. **Preserve physical plan immutability** — caching is scheduler metadata, not plan modification + +**Key principle**: The physical plan remains unchanged; cache-or-recompute decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream sub-DAG. + +**Research goals**: +- Extend Pasta scheduler with cost-based caching decisions +- Develop cost models and pruning heuristics for what/when to cache +- Evaluate speedup on iterative data science workflows +- (Optional) Lifecycle management: eviction, invalidation, garbage collection + +## Key Design Decisions + +### 1. Physical Plan Immutability +The physical plan is never modified to accommodate caching. Cache decisions are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This preserves the integrity of the Pasta scheduling framework. + +### 2. Fingerprint-Based Correctness +Cache keys use SHA-256 hashes of canonical upstream sub-DAG representations (operators + schemas + edges + init info). This ensures: +- Deterministic matching: same computation = same fingerprint +- Automatic invalidation: any upstream change = different fingerprint = cache miss +- No explicit dependency tracking needed + +### 3. Region Homogeneity Constraint +A region is either fully cached (ToSkip) or fully executed (ToExecute) — no partial execution within a region. This simplifies: +- Scheduler logic (binary cached flag per region) +- Runtime state management (consistent execution mode) +- Cost model (compare full region costs) + +### 4. Shallow State Hierarchy for Cached Regions +ToSkip regions create lightweight state structures (Workflow → Region → Operator/Link) without Worker/Channel states. This is consistent with `numWorkers=0` and avoids synthetic worker lifecycle management. + +### 5. Stats Emission via Direct Client Updates +Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`. This reuses existing stats infrastructure without special-casing the frontend. + +### 6. No Explicit Cache Flag in Metrics +Cached execution is inferred from `numWorkers=0` + instant completion, rather than adding a `from_cache` flag to protobuf messages. This minimizes protocol changes. + +### 7. Deferred Lifecycle Management +V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research topics for future work, not implementation blockers. ## Data Model - Table `operator_port_cache` (PK `(workflow_id, global_port_id, subdag_hash)`): @@ -21,42 +64,589 @@ Reuse previously materialized operator output ports without modifying the physic - Hash: SHA-256 of the payload JSON. ## End-to-End Workflow -1) **Lookup before execution (mark Δ ports)** - - Compile to `PhysicalPlan`. - - For each materializable output port (internal/external), compute fingerprint. - - Query `operator_port_cache` by `(workflow_id, global_port_id, subdag_hash)`; collect hits into `cachedOutputs`. - - `WorkflowSettings` carries `cachedOutputs` keyed by serialized `GlobalPortIdentity` to avoid custom map key deserialization. - -2) **Scheduler integration (Pasta)** - - Inputs: physical plan, `cachedOutputs` (Δ), visible ports (☐). - - Classify regions: `must-execute` if they contain visible ports without cache or depend on uncached materializations; remaining regions are `cached`. - - Cost model: cached regions cost 0; executing operators >0; materialization reads/writes small fixed costs. - - Schedule marks cached vs must-execute; runtime skips cached regions and uses cached URIs for materialized reads. - -3) **Runtime behavior** - - Cached regions: skip operator execution; mark operators completed; ensure downstream readers use cached `result_uri`. - - Must-execute regions: run normally; on output port completion, compute fingerprint and upsert `operator_port_cache` with hash/fingerprint/URI (tuple count if available). - -4) **API/Helpers** - - `WorkflowExecutionsResource`: `getResultUriByPhysicalPortId`, `upsertOperatorPortCache`. - - Optional cache DAO/service wrapper for cleaner calls. - -5) **Testing** - - Fingerprint determinism/change detection. - - Cache lookup integration (insert + retrieve by hash). - - Region classification tests for Δ/☐ combinations. - - End-to-end: run → populate cache → rerun → verify cached regions are skipped and results served from cache. - -## Current Progress (checkpoint) -- Schema/migration added. -- `FingerprintUtil` implemented and covered with workflow-based specs. -- Submission-time cache lookup wired: `WorkflowExecutionService` fingerprints all physical output ports, queries cache, and stores hits in `WorkflowSettings.cachedOutputs` (keyed by serialized `GlobalPortIdentity`). -- Cache upsert on output port completion in `PortCompletedHandler` (guarded on plan + URI; tuple count best-effort). -- `WorkflowExecutionsResource` exposes lookup/upsert helpers (`getResultUriByPhysicalPortId`, `getOperatorPortCache`, `upsertOperatorPortCache`); cache maps currently use stringified port IDs to avoid Jackson map key serde issues. -- Scheduler/runtime path now reuses cached materializations: `CostBasedScheduleGenerator` marks regions cached when all required outputs have cache hits, reuses cached URIs (and cached tuple counts) in port configs, and cached regions emit stats with cached counts; `WorkflowExecutionCoordinator`/`RegionExecutionCoordinator` short-circuit cached regions, mark ports/workers completed, emit stats, and propagate cached URIs downstream. Completion notification remains aligned with the normal [...] - -## Next Actions -- Refine region classification/cost model (Δ/☐ rules) and ensure cached vs must-execute decisions align with visibility/materialization needs. -- Tighten runtime semantics: double-check downstream consumption of cached URIs and UI exposure for visible ports; consider marking worker/region states for observability. -- Refine cache upsert if needed (tuple count accuracy, avoid duplicate writes). -- Add integration and end-to-end tests for cache lookup/reuse paths and scheduler decisions. + +### 1. Cache Lookup (Submission Time) +**Location**: `WorkflowExecutionService` → `OperatorPortCacheService` → `OperatorPortCacheDao` + +- Compile logical workflow to `PhysicalPlan` +- Call `OperatorPortCacheService.lookupCachedOutputs(workflowId, physicalPlan)`: + - For each materializable output port (internal/external): + - Compute fingerprint via `FingerprintUtil.computeSubdagFingerprint(physicalPlan, globalPortId)` + - Query `operator_port_cache` table via DAO by `(workflow_id, global_port_id, subdag_hash)` + - Return `Map[GlobalPortIdentity, CachedOutput]` with all cache hits +- Convert to serialized form: `Map[String, CachedOutput]` (keyed by serialized `GlobalPortIdentity` to avoid Jackson map key deserialization issues) +- Store in `WorkflowSettings.cachedOutputs` and pass to scheduler + +### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator) +**Location**: `CostBasedScheduleGenerator` + +**Inputs**: +- Physical plan (immutable) +- `cachedOutputs` (Δ): map of cache hits from step 1 +- Visible ports (☐): ports that must be materialized for user visibility + +**Region Classification**: +- **Homogeneity constraint**: A region is either fully cached (ToSkip) or fully executed (ToExecute) — no mixing within a region +- **ToExecute regions**: Contain visible ports without cache hits OR depend on uncached intermediate materializations +- **ToSkip regions**: All required output ports have cache hits AND no visible ports need fresh computation + +**Cost Model** (currently simple, needs refinement for research): +- Cached regions: cost = 0 +- Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + materialization read/write costs +- Cache read/write: small fixed costs (0.5 per port) +- **Future**: Historical stats-based estimation from `runtime_statistics` table + +**Output**: +- Schedule with regions marked `cached=true/false` +- Port configs updated with cached URIs for ToSkip regions +- Cached tuple counts reused in port metadata + +### 3. Runtime Execution +**Location**: `RegionExecutionCoordinator`, `PortCompletedHandler` + +#### ToSkip Regions (Cached) +Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached` flag + +**Execution path**: +1. **Skip operator execution**: Call `completeCachedRegion()` immediately +2. **State hierarchy** (shallow): + - Create: Workflow → Region → Operator/Link states + - Skip: Worker/Channel states (not needed) +3. **Mark ports completed**: Set port status to COMPLETED with cached URI +4. **Emit synthetic stats** via `asyncRPCClient.sendToClient(ExecutionStatsUpdate(...))`: + - `numWorkers = 0` + - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0` + - Input/output tuple counts from cached metadata +5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` for materialized inputs +6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment events (consistent with numWorkers=0) +7. **Set phase to Completed**: Region lifecycle completes immediately + +#### ToExecute Regions (Normal Execution) +**Location**: `PortCompletedHandler` → `OperatorPortCacheService` → `OperatorPortCacheDao` + +1. **Execute operators**: Normal execution path via worker actors +2. **On output port completion** (`PortCompletedHandler`): + - Call `OperatorPortCacheService.upsertCachedOutput(...)`: + - Compute fingerprint via `FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)` + - Retrieve tuple count (best-effort via `DocumentFactory.openDocument(uri).getCount`) + - Upsert to `operator_port_cache` table via DAO with: + - `workflow_id`, `global_port_id`, `subdag_hash` + - `fingerprint_json`, `result_uri` + - `tuple_count`, `source_execution_id`, timestamps +3. **Normal stats emission**: Real execution metrics sent to client + +### 4. Client-Side State Management +**Location**: `ExecutionStatsService`, `ExecutionStateStore` + +**Stats propagation**: +- `ExecutionStatsUpdate` events (from both cached and normal regions) update `ExecutionStatsStore` +- Frontend receives operator metrics via WebSocket subscription +- Cached regions appear as instantly completed operators with 0 workers and 0 processing time + +**No structural changes needed**: +- Existing protobuf messages (`OperatorStatistics`) handle cached regions naturally +- No `from_cache` flag required (can infer from numWorkers=0 + instant completion) +- Lifecycle management (eviction, cleanup) deferred to future work + +### 5. Service & DAO Architecture + +#### OperatorPortCacheDao +**Location**: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` + +Low-level database access using Jooq: +```scala +class OperatorPortCacheDao(sqlServer: SqlServer) { + + /** Query cache entry by PK (workflow_id, global_port_id, subdag_hash) */ + def get( + workflowId: Long, + serializedPortId: String, + subdagHash: String + ): Option[OperatorPortCacheRecord] + + /** Upsert cache entry (insert or update on conflict) */ + def upsert(record: OperatorPortCacheRecord): Unit + + /** Delete all cache entries for a workflow */ + def deleteByWorkflow(workflowId: Long): Unit +} +``` + +#### OperatorPortCacheService +**Location**: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` + +High-level cache operations with business logic: +```scala +class OperatorPortCacheService(dao: OperatorPortCacheDao) { + + /** Lookup cached outputs for all ports in a physical plan (submission time) */ + def lookupCachedOutputs( + workflowId: WorkflowIdentity, + physicalPlan: PhysicalPlan + ): Map[GlobalPortIdentity, CachedOutput] + + /** Upsert cache entry when output port completes (runtime) */ + def upsertCachedOutput( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + portId: GlobalPortIdentity, + physicalPlan: PhysicalPlan, + resultUri: URI, + tupleCount: Option[Long] + ): Unit + + /** Invalidate all cache entries for a workflow (lifecycle management) */ + def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit + + /** Future: Cost-aware eviction when storage quota exceeded */ + def evictLowValueEntries(quotaBytes: Long): Unit +} +``` + +**Key responsibilities**: +- Encapsulates fingerprint computation (calls `FingerprintUtil`) +- Handles `GlobalPortIdentity` ↔ String serialization +- Manages tuple count retrieval (best-effort via `DocumentFactory`) +- Provides workflow-level abstractions (batch lookup, invalidation) + +#### WorkflowExecutionsResource (REST API - Optional) +**Location**: `/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala` + +HTTP endpoints for external access (if needed): +- `GET /cache/{workflowId}/{portId}/{hash}`: Manual cache lookup +- `DELETE /cache/{workflowId}`: Manual cache invalidation + +**Note**: Internal services use `OperatorPortCacheService`, not the REST resource. + +### 6. Implementation Guide + +#### Step 1: Create OperatorPortCacheDao +**File**: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` + +```scala +package org.apache.texera.web.dao + +import org.apache.texera.amber.core.virtualidentity.ExecutionIdentity +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE +import org.jooq.DSLContext + +import java.net.URI +import scala.jdk.OptionConverters._ + +case class OperatorPortCacheRecord( + workflowId: Long, + globalPortId: String, + subdagHash: String, + fingerprintJson: String, + resultUri: URI, + tupleCount: Option[Long], + sourceExecutionId: Option[Long], + createdAt: Long, + updatedAt: Long +) + +class OperatorPortCacheDao(sqlServer: SqlServer) { + private val context: DSLContext = sqlServer.createDSLContext() + + def get( + workflowId: Long, + serializedPortId: String, + subdagHash: String + ): Option[OperatorPortCacheRecord] = { + context + .select( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH, + OPERATOR_PORT_CACHE.FINGERPRINT_JSON, + OPERATOR_PORT_CACHE.RESULT_URI, + OPERATOR_PORT_CACHE.TUPLE_COUNT, + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, + OPERATOR_PORT_CACHE.CREATED_AT, + OPERATOR_PORT_CACHE.UPDATED_AT + ) + .from(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId)) + .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) + .fetchOptional() + .toScala + .map { record => + OperatorPortCacheRecord( + workflowId = record.value1().longValue(), + globalPortId = record.value2(), + subdagHash = record.value3(), + fingerprintJson = record.value4(), + resultUri = URI.create(record.value5()), + tupleCount = Option(record.value6()).map(_.longValue()), + sourceExecutionId = Option(record.value7()).map(_.longValue()), + createdAt = record.value8().longValue(), + updatedAt = record.value9().longValue() + ) + } + } + + def upsert(record: OperatorPortCacheRecord): Unit = { + val dbRecord = context.newRecord(OPERATOR_PORT_CACHE) + dbRecord.setWorkflowId(record.workflowId.toInt) + dbRecord.setGlobalPortId(record.globalPortId) + dbRecord.setSubdagHash(record.subdagHash) + dbRecord.setFingerprintJson(record.fingerprintJson) + dbRecord.setResultUri(record.resultUri.toString) + record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c))) + record.sourceExecutionId.foreach(eid => dbRecord.setSourceExecutionId(Long.box(eid))) + + context + .insertInto(OPERATOR_PORT_CACHE) + .set(dbRecord) + .onConflict( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH + ) + .doUpdate() + .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri) + .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson) + .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount) + .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, dbRecord.getSourceExecutionId) + .execute() + } + + def deleteByWorkflow(workflowId: Long): Unit = { + context + .deleteFrom(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .execute() + } +} +``` + +#### Step 2: Create OperatorPortCacheService +**File**: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` + +```scala +package org.apache.texera.web.service + +import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.cache.FingerprintUtil +import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity, PhysicalPlan} +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.apache.texera.web.dao.{OperatorPortCacheDao, OperatorPortCacheRecord} + +import java.net.URI + +class OperatorPortCacheService(dao: OperatorPortCacheDao) { + + /** + * Lookup cached outputs for all materializable ports in the physical plan. + * Called at workflow submission time. + */ + def lookupCachedOutputs( + workflowId: WorkflowIdentity, + physicalPlan: PhysicalPlan + ): Map[GlobalPortIdentity, CachedOutput] = { + physicalPlan.operators + .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, pid))) + .flatMap { gpid => + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) + dao.get(workflowId.id, gpid.serializeAsString, fingerprint.subdagHash).map { record => + gpid -> CachedOutput( + resultUri = record.resultUri, + fingerprintJson = record.fingerprintJson, + tupleCount = record.tupleCount, + sourceExecutionId = record.sourceExecutionId.map(ExecutionIdentity(_)) + ) + } + } + .toMap + } + + /** + * Upsert cache entry when an output port completes. + * Called by PortCompletedHandler at runtime. + */ + def upsertCachedOutput( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + portId: GlobalPortIdentity, + physicalPlan: PhysicalPlan, + resultUri: URI, + tupleCount: Option[Long] + ): Unit = { + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId) + val now = System.currentTimeMillis() + + dao.upsert( + OperatorPortCacheRecord( + workflowId = workflowId.id, + globalPortId = portId.serializeAsString, + subdagHash = fingerprint.subdagHash, + fingerprintJson = fingerprint.fingerprintJson, + resultUri = resultUri, + tupleCount = tupleCount, + sourceExecutionId = Some(executionId.id), + createdAt = now, + updatedAt = now + ) + ) + } + + /** + * Invalidate all cache entries for a workflow. + * Useful for manual cache clearing or workflow deletion. + */ + def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = { + dao.deleteByWorkflow(workflowId.id) + } + + /** + * Future: Cost-aware eviction when storage quota is exceeded. + */ + def evictLowValueEntries(quotaBytes: Long): Unit = { + // Phase 3: Lifecycle management + throw new UnsupportedOperationException("Not yet implemented") + } +} +``` + +#### Step 3: Update WorkflowExecutionService +**File**: `/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` + +```scala +class WorkflowExecutionService( + controllerConfig: ControllerConfig, + val workflowContext: WorkflowContext, + resultService: ExecutionResultService, + cacheService: OperatorPortCacheService, // INJECT HERE + request: WorkflowExecuteRequest, + val executionStateStore: ExecutionStateStore, + errorHandler: Throwable => Unit, + userEmailOpt: Option[String], + sessionUri: URI +) extends SubscriptionManager with LazyLogging { + + def executeWorkflow(): Unit = { + try { + workflow = new WorkflowCompiler(workflowContext).compile(request.logicalPlan) + + // Use cache service for lookup + val cachedOutputs = cacheService + .lookupCachedOutputs(workflowContext.workflowId, workflow.physicalPlan) + .map { case (gpid, cached) => gpid.serializeAsString -> cached } + + workflowContext.workflowSettings = + workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs) + } catch { + case err: Throwable => errorHandler(err) + } + // ... rest of method + } +} +``` + +#### Step 4: Update PortCompletedHandler +**File**: `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala` + +```scala +// Inject OperatorPortCacheService into controller/handler + +(storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match { + case (Some(uri), Some(plan)) => + val tupleCount = try { + Some(DocumentFactory.openDocument(uri)._1.getCount) + } catch { + case _: Throwable => None + } + + // Use cache service for upsert + cacheService.upsertCachedOutput( + cp.workflowContext.workflowId, + cp.workflowContext.executionId, + globalPortId, + plan, + uri, + tupleCount + ) + case _ => // no-op +} +``` + +### 7. Testing Strategy +- **Unit tests**: Fingerprint determinism, cost model logic, region classification +- **Integration tests**: Cache upsert → DB verification, cache lookup → region marking +- **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip behavior and result correctness +- **Change detection**: Modify operator params → verify fingerprint mismatch → cache miss + +## Current Implementation Status + +### Architecture Layers + +**Current (Prototype)**: +``` +WorkflowExecutionService ───┐ + ├──→ WorkflowExecutionsResource (Jooq) +PortCompletedHandler ───────┘ +``` + +**Proposed (Clean Architecture)**: +``` +WorkflowExecutionService ───┐ + ├──→ OperatorPortCacheService ──→ OperatorPortCacheDao (Jooq) +PortCompletedHandler ───────┤ + │ +WorkflowExecutionsResource ─┘ (optional REST endpoints) +``` + +**Refactoring needed**: +1. Extract Jooq code from `WorkflowExecutionsResource` → `OperatorPortCacheDao` +2. Create `OperatorPortCacheService` with workflow-level abstractions +3. Update service call sites to use `OperatorPortCacheService` + +### Completed Components +- **Schema/migration**: `operator_port_cache` table added (`sql/texera_ddl.sql`, `sql/updates/16.sql`) +- **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs for deterministic subDAG hashing +- **Submission-time lookup**: `WorkflowExecutionService` computes fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` +- **Cache persistence**: `PortCompletedHandler` upserts to `operator_port_cache` on output port completion (includes fingerprint, URI, tuple count) +- **API layer**: `WorkflowExecutionsResource` exposes cache lookup/upsert helpers +- **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached when all required outputs have hits, reuses cached URIs in port configs +- **Runtime execution**: `RegionExecutionCoordinator` branches on `region.cached` flag: + - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs downstream + - ToExecute regions: normal execution path +- **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct client updates, maintaining consistency with normal execution lifecycle + +### Architecture Integration +The cache system integrates with three layers: +1. **Execution Planning Layer**: Cache lookup at workflow submission, fingerprint computation +2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, region classification +3. **Runtime Layer**: Short-circuit execution for cached regions, state management, stats emission + +## Research Contributions + +### 1. Cost Model & Pruning (Primary Contribution) +**Goal**: Determine when caching is beneficial and what outputs to cache. + +**Components**: +- **Operator cost estimation**: Predict execution cost using historical runtime statistics +- **Cache I/O cost**: Model read/write cost for materialized outputs +- **Pruning heuristics**: + - Skip caching small outputs (recompute is cheap) + - Skip terminal operators (no downstream reuse) + - Skip low-reuse-probability operators +- **Cache-or-recompute decision**: Per-region cost comparison (cache read vs recompute) + +**Current status**: Simple cost model (cached=0, execute>0). Need to develop historical stats-based estimation. + +**Data source**: `runtime_statistics` table already captures execution metrics (data/control processing time, tuple counts, worker counts per operator). + +### 2. Result Lifecycle Management (Secondary Contribution) +**Goal**: Maintain cache correctness and efficiency over time. + +**Components**: +- **Cache eviction**: When storage is limited, prioritize keeping high recompute-cost/storage-cost ratio entries +- **Invalidation**: Fingerprint-based (already handles operator param changes) +- **Source change detection**: Track external data source versions (deferred - hard problem) +- **Garbage collection**: Clean up cache entries for deleted workflows/executions + +**Current status**: Assumed unlimited storage. Lifecycle management tabled for initial implementation. + +**Future considerations**: +- Cost-aware eviction policies (LRU/LFU variants) +- Source versioning for common sources (files, databases) +- Cross-workflow cache sharing (global fingerprint registry) + +## Evaluation Plan + +### Benchmark Workloads +1. **Linear pipeline**: A → B → C → D (sequential operators) +2. **Diamond pattern**: A → B, A → C, B+C → D (shared prefix) +3. **Multiple branches**: Complex DAGs with shared subgraphs +4. **UDF-heavy workflows**: Expensive computation (Python/R operators) + +### Metrics +- **Execution time**: Full execution vs cached re-execution +- **Cache overhead**: Fingerprint computation + DB lookup latency +- **Storage cost**: Cache entry sizes across workload types +- **Hit rate**: Percentage of regions served from cache + +### Experiments +1. **Baseline**: Execute workflow without caching +2. **Cold cache**: First execution (populate cache, measure overhead) +3. **Warm cache**: Re-execution (all hits, measure speedup) +4. **Partial invalidation**: Modify one operator, re-execute (measure partial hit benefit) +5. **Scalability**: Vary workflow size (10, 50, 100, 200 operators) + +### Comparison Baselines +- Spark RDD persistence (in-memory, no cross-execution) +- Manual materialization (user-defined intermediate saves) +- No caching (full re-execution) + +## Next Steps + +### Phase 1: Complete Prototype (Engineering) + +#### 1.1 Refactor to Service/DAO Architecture +- [ ] Create `OperatorPortCacheDao` with get/upsert/delete methods + - Extract Jooq code from `WorkflowExecutionsResource` + - Define `OperatorPortCacheRecord` case class + - Add unit tests for DAO operations +- [ ] Create `OperatorPortCacheService` with high-level methods + - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission + - `upsertCachedOutput(...)`: cache write on port completion + - `invalidateWorkflowCache(workflowId)`: manual invalidation + - Encapsulate fingerprint computation and serialization +- [ ] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service +- [ ] Refactor `PortCompletedHandler` to use service +- [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that delegate to service + +#### 1.2 Testing & Validation +- [ ] Verify downstream cached URI consumption across all operator types +- [ ] Add integration tests: cache upsert → DB verification +- [ ] Add E2E tests: run → cache → rerun → verify skip +- [ ] Clean up state hierarchy for cached regions (confirm shallow hierarchy) +- [ ] Verify tuple count accuracy in cache metadata + +### Phase 2: Cost Model Development (Research) +- [ ] Analyze historical runtime_statistics data for cost patterns +- [ ] Implement cost estimation for operator execution (based on historical stats) +- [ ] Implement cache I/O cost model (storage backend dependent) +- [ ] Develop pruning heuristics (size thresholds, reuse patterns) +- [ ] Integrate cost model with `DefaultCostEstimator` +- [ ] Evaluate cache decisions: cost-based vs always-cache vs never-cache + +### Phase 3: Lifecycle Management (Research - Optional) +- [ ] Design cost-aware eviction policy +- [ ] Implement storage quota management +- [ ] Add garbage collection for deleted workflows +- [ ] Evaluate eviction policy effectiveness + +### Phase 4: Evaluation & Publication +- [ ] Design and implement benchmark workloads +- [ ] Run experiments: measure speedup, overhead, hit rates +- [ ] Scalability analysis (varying workflow sizes) +- [ ] Write research paper (target: SIGMOD/VLDB or workshop) + +## Open Research Questions +1. **Cost model accuracy**: How well can historical stats predict future execution costs? Does operator heterogeneity require per-operator-type models? +2. **Reuse patterns**: Can we predict which outputs will be reused based on user interaction patterns? +3. **Source change detection**: For external data sources (files, databases), how to efficiently detect changes without explicit versioning? +4. **Cross-workflow sharing**: Is global cache sharing (same subDAG across workflows) worth the complexity? +5. **Incremental maintenance**: When source data changes slightly, can we update cache incrementally vs full invalidation? + +## Publication Strategy +**Primary target**: Cost-aware caching framework for incremental workflow execution + +**Minimum viable contributions** (Workshop/Demo paper): +- Pasta extension for cache-aware scheduling +- Fingerprint-based correctness +- Working prototype with evaluation + +**Full paper contributions** (SIGMOD/VLDB): +- Above + cost model with formal problem definition +- Cost-aware eviction (lifecycle management) +- Comprehensive evaluation with multiple baselines + +**Alternative positioning**: +- Demo paper: Focus on Texera integration and user experience +- Industrial track: Deployment story with real user workloads
