This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 16dfe97b7adc7f80b41e0eb0246f75976ba7a790
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 12 15:42:02 2026 -0800

    feat(cache): update design doc.
---
 docs/operator-port-cache.md | 670 +++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 630 insertions(+), 40 deletions(-)

diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 5fc000c583..cc1d760437 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -1,7 +1,50 @@
 # Operator Port Result Cache Design
 
 ## Objective
-Reuse previously materialized operator output ports without modifying the 
physical plan. The physical plan remains immutable; reuse decisions are made by 
the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata 
keyed by a deterministic fingerprint of the upstream sub‑DAG.
+
+Enable **incremental workflow execution** through cost-aware caching of 
operator output ports. When users iteratively refine workflows (modify 
parameters, add/remove operators), the system should:
+
+1. **Reuse cached results** where upstream computation is unchanged
+2. **Make cost-aware decisions** comparing cache read cost vs recomputation 
cost
+3. **Maintain correctness** via deterministic fingerprinting of upstream 
sub-DAGs
+4. **Preserve physical plan immutability** — caching is scheduler metadata, 
not plan modification
+
+**Key principle**: The physical plan remains unchanged; cache-or-recompute 
decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based 
on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream 
sub-DAG.
+
+**Research goals**:
+- Extend Pasta scheduler with cost-based caching decisions
+- Develop cost models and pruning heuristics for what/when to cache
+- Evaluate speedup on iterative data science workflows
+- (Optional) Lifecycle management: eviction, invalidation, garbage collection
+
+## Key Design Decisions
+
+### 1. Physical Plan Immutability
+The physical plan is never modified to accommodate caching. Cache decisions 
are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This 
preserves the integrity of the Pasta scheduling framework.
+
+### 2. Fingerprint-Based Correctness
+Cache keys use SHA-256 hashes of canonical upstream sub-DAG representations 
(operators + schemas + edges + init info). This ensures:
+- Deterministic matching: same computation = same fingerprint
+- Automatic invalidation: any upstream change = different fingerprint = cache 
miss
+- No explicit dependency tracking needed
+
+### 3. Region Homogeneity Constraint
+A region is either fully cached (ToSkip) or fully executed (ToExecute) — no 
partial execution within a region. This simplifies:
+- Scheduler logic (binary cached flag per region)
+- Runtime state management (consistent execution mode)
+- Cost model (compare full region costs)
+
+### 4. Shallow State Hierarchy for Cached Regions
+ToSkip regions create lightweight state structures (Workflow → Region → 
Operator/Link) without Worker/Channel states. This is consistent with 
`numWorkers=0` and avoids synthetic worker lifecycle management.
+
+### 5. Stats Emission via Direct Client Updates
+Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`. This reuses existing stats infrastructure 
without special-casing the frontend.
+
+### 6. No Explicit Cache Flag in Metrics
+Cached execution is inferred from `numWorkers=0` + instant completion, rather 
than adding a `from_cache` flag to protobuf messages. This minimizes protocol 
changes.
+
+### 7. Deferred Lifecycle Management
+V1 assumes unlimited storage. Eviction, TTL, and garbage collection are 
research topics for future work, not implementation blockers.
 
 ## Data Model
 - Table `operator_port_cache` (PK `(workflow_id, global_port_id, 
subdag_hash)`):
@@ -21,42 +64,589 @@ Reuse previously materialized operator output ports 
without modifying the physic
 - Hash: SHA-256 of the payload JSON.
 
 ## End-to-End Workflow
-1) **Lookup before execution (mark Δ ports)**
-   - Compile to `PhysicalPlan`.
-   - For each materializable output port (internal/external), compute 
fingerprint.
-   - Query `operator_port_cache` by `(workflow_id, global_port_id, 
subdag_hash)`; collect hits into `cachedOutputs`.
-   - `WorkflowSettings` carries `cachedOutputs` keyed by serialized 
`GlobalPortIdentity` to avoid custom map key deserialization.
-
-2) **Scheduler integration (Pasta)**
-   - Inputs: physical plan, `cachedOutputs` (Δ), visible ports (☐).
-   - Classify regions: `must-execute` if they contain visible ports without 
cache or depend on uncached materializations; remaining regions are `cached`.
-   - Cost model: cached regions cost 0; executing operators >0; 
materialization reads/writes small fixed costs.
-   - Schedule marks cached vs must-execute; runtime skips cached regions and 
uses cached URIs for materialized reads.
-
-3) **Runtime behavior**
-   - Cached regions: skip operator execution; mark operators completed; ensure 
downstream readers use cached `result_uri`.
-   - Must-execute regions: run normally; on output port completion, compute 
fingerprint and upsert `operator_port_cache` with hash/fingerprint/URI (tuple 
count if available).
-
-4) **API/Helpers**
-   - `WorkflowExecutionsResource`: `getResultUriByPhysicalPortId`, 
`upsertOperatorPortCache`.
-   - Optional cache DAO/service wrapper for cleaner calls.
-
-5) **Testing**
-   - Fingerprint determinism/change detection.
-   - Cache lookup integration (insert + retrieve by hash).
-   - Region classification tests for Δ/☐ combinations.
-   - End-to-end: run → populate cache → rerun → verify cached regions are 
skipped and results served from cache.
-
-## Current Progress (checkpoint)
-- Schema/migration added.
-- `FingerprintUtil` implemented and covered with workflow-based specs.
-- Submission-time cache lookup wired: `WorkflowExecutionService` fingerprints 
all physical output ports, queries cache, and stores hits in 
`WorkflowSettings.cachedOutputs` (keyed by serialized `GlobalPortIdentity`).
-- Cache upsert on output port completion in `PortCompletedHandler` (guarded on 
plan + URI; tuple count best-effort).
-- `WorkflowExecutionsResource` exposes lookup/upsert helpers 
(`getResultUriByPhysicalPortId`, `getOperatorPortCache`, 
`upsertOperatorPortCache`); cache maps currently use stringified port IDs to 
avoid Jackson map key serde issues.
-- Scheduler/runtime path now reuses cached materializations: 
`CostBasedScheduleGenerator` marks regions cached when all required outputs 
have cache hits, reuses cached URIs (and cached tuple counts) in port configs, 
and cached regions emit stats with cached counts; 
`WorkflowExecutionCoordinator`/`RegionExecutionCoordinator` short-circuit 
cached regions, mark ports/workers completed, emit stats, and propagate cached 
URIs downstream. Completion notification remains aligned with the normal  [...]
-
-## Next Actions
-- Refine region classification/cost model (Δ/☐ rules) and ensure cached vs 
must-execute decisions align with visibility/materialization needs.
-- Tighten runtime semantics: double-check downstream consumption of cached 
URIs and UI exposure for visible ports; consider marking worker/region states 
for observability.
-- Refine cache upsert if needed (tuple count accuracy, avoid duplicate writes).
-- Add integration and end-to-end tests for cache lookup/reuse paths and 
scheduler decisions.
+
+### 1. Cache Lookup (Submission Time)
+**Location**: `WorkflowExecutionService` → `OperatorPortCacheService` → 
`OperatorPortCacheDao`
+
+- Compile logical workflow to `PhysicalPlan`
+- Call `OperatorPortCacheService.lookupCachedOutputs(workflowId, 
physicalPlan)`:
+  - For each materializable output port (internal/external):
+    - Compute fingerprint via 
`FingerprintUtil.computeSubdagFingerprint(physicalPlan, globalPortId)`
+    - Query `operator_port_cache` table via DAO by `(workflow_id, 
global_port_id, subdag_hash)`
+  - Return `Map[GlobalPortIdentity, CachedOutput]` with all cache hits
+- Convert to serialized form: `Map[String, CachedOutput]` (keyed by serialized 
`GlobalPortIdentity` to avoid Jackson map key deserialization issues)
+- Store in `WorkflowSettings.cachedOutputs` and pass to scheduler
+
+### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator)
+**Location**: `CostBasedScheduleGenerator`
+
+**Inputs**:
+- Physical plan (immutable)
+- `cachedOutputs` (Δ): map of cache hits from step 1
+- Visible ports (☐): ports that must be materialized for user visibility
+
+**Region Classification**:
+- **Homogeneity constraint**: A region is either fully cached (ToSkip) or 
fully executed (ToExecute) — no mixing within a region
+- **ToExecute regions**: Contain visible ports without cache hits OR depend on 
uncached intermediate materializations
+- **ToSkip regions**: All required output ports have cache hits AND no visible 
ports need fresh computation
+
+**Cost Model** (currently simple, needs refinement for research):
+- Cached regions: cost = 0
+- Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + 
materialization read/write costs
+- Cache read/write: small fixed costs (0.5 per port)
+- **Future**: Historical stats-based estimation from `runtime_statistics` table
+
+**Output**:
+- Schedule with regions marked `cached=true/false`
+- Port configs updated with cached URIs for ToSkip regions
+- Cached tuple counts reused in port metadata
+
+### 3. Runtime Execution
+**Location**: `RegionExecutionCoordinator`, `PortCompletedHandler`
+
+#### ToSkip Regions (Cached)
+Entry point: `RegionExecutionCoordinator` constructor branches on 
`region.cached` flag
+
+**Execution path**:
+1. **Skip operator execution**: Call `completeCachedRegion()` immediately
+2. **State hierarchy** (shallow):
+   - Create: Workflow → Region → Operator/Link states
+   - Skip: Worker/Channel states (not needed)
+3. **Mark ports completed**: Set port status to COMPLETED with cached URI
+4. **Emit synthetic stats** via 
`asyncRPCClient.sendToClient(ExecutionStatsUpdate(...))`:
+   - `numWorkers = 0`
+   - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0`
+   - Input/output tuple counts from cached metadata
+5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` 
for materialized inputs
+6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment 
events (consistent with numWorkers=0)
+7. **Set phase to Completed**: Region lifecycle completes immediately
+
+#### ToExecute Regions (Normal Execution)
+**Location**: `PortCompletedHandler` → `OperatorPortCacheService` → 
`OperatorPortCacheDao`
+
+1. **Execute operators**: Normal execution path via worker actors
+2. **On output port completion** (`PortCompletedHandler`):
+   - Call `OperatorPortCacheService.upsertCachedOutput(...)`:
+     - Compute fingerprint via 
`FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)`
+     - Retrieve tuple count (best-effort via 
`DocumentFactory.openDocument(uri).getCount`)
+     - Upsert to `operator_port_cache` table via DAO with:
+       - `workflow_id`, `global_port_id`, `subdag_hash`
+       - `fingerprint_json`, `result_uri`
+       - `tuple_count`, `source_execution_id`, timestamps
+3. **Normal stats emission**: Real execution metrics sent to client
+
+### 4. Client-Side State Management
+**Location**: `ExecutionStatsService`, `ExecutionStateStore`
+
+**Stats propagation**:
+- `ExecutionStatsUpdate` events (from both cached and normal regions) update 
`ExecutionStatsStore`
+- Frontend receives operator metrics via WebSocket subscription
+- Cached regions appear as instantly completed operators with 0 workers and 0 
processing time
+
+**No structural changes needed**:
+- Existing protobuf messages (`OperatorStatistics`) handle cached regions 
naturally
+- No `from_cache` flag required (can infer from numWorkers=0 + instant 
completion)
+- Lifecycle management (eviction, cleanup) deferred to future work
+
+### 5. Service & DAO Architecture
+
+#### OperatorPortCacheDao
+**Location**: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
+
+Low-level database access using Jooq:
+```scala
+class OperatorPortCacheDao(sqlServer: SqlServer) {
+
+  /** Query cache entry by PK (workflow_id, global_port_id, subdag_hash) */
+  def get(
+      workflowId: Long,
+      serializedPortId: String,
+      subdagHash: String
+  ): Option[OperatorPortCacheRecord]
+
+  /** Upsert cache entry (insert or update on conflict) */
+  def upsert(record: OperatorPortCacheRecord): Unit
+
+  /** Delete all cache entries for a workflow */
+  def deleteByWorkflow(workflowId: Long): Unit
+}
+```
+
+#### OperatorPortCacheService
+**Location**: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
+
+High-level cache operations with business logic:
+```scala
+class OperatorPortCacheService(dao: OperatorPortCacheDao) {
+
+  /** Lookup cached outputs for all ports in a physical plan (submission time) 
*/
+  def lookupCachedOutputs(
+      workflowId: WorkflowIdentity,
+      physicalPlan: PhysicalPlan
+  ): Map[GlobalPortIdentity, CachedOutput]
+
+  /** Upsert cache entry when output port completes (runtime) */
+  def upsertCachedOutput(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity,
+      portId: GlobalPortIdentity,
+      physicalPlan: PhysicalPlan,
+      resultUri: URI,
+      tupleCount: Option[Long]
+  ): Unit
+
+  /** Invalidate all cache entries for a workflow (lifecycle management) */
+  def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit
+
+  /** Future: Cost-aware eviction when storage quota exceeded */
+  def evictLowValueEntries(quotaBytes: Long): Unit
+}
+```
+
+**Key responsibilities**:
+- Encapsulates fingerprint computation (calls `FingerprintUtil`)
+- Handles `GlobalPortIdentity` ↔ String serialization
+- Manages tuple count retrieval (best-effort via `DocumentFactory`)
+- Provides workflow-level abstractions (batch lookup, invalidation)
+
+#### WorkflowExecutionsResource (REST API - Optional)
+**Location**: 
`/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala`
+
+HTTP endpoints for external access (if needed):
+- `GET /cache/{workflowId}/{portId}/{hash}`: Manual cache lookup
+- `DELETE /cache/{workflowId}`: Manual cache invalidation
+
+**Note**: Internal services use `OperatorPortCacheService`, not the REST 
resource.
+
+### 6. Implementation Guide
+
+#### Step 1: Create OperatorPortCacheDao
+**File**: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
+
+```scala
+package org.apache.texera.web.dao
+
+import org.apache.texera.amber.core.virtualidentity.ExecutionIdentity
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE
+import org.jooq.DSLContext
+
+import java.net.URI
+import scala.jdk.OptionConverters._
+
+case class OperatorPortCacheRecord(
+    workflowId: Long,
+    globalPortId: String,
+    subdagHash: String,
+    fingerprintJson: String,
+    resultUri: URI,
+    tupleCount: Option[Long],
+    sourceExecutionId: Option[Long],
+    createdAt: Long,
+    updatedAt: Long
+)
+
+class OperatorPortCacheDao(sqlServer: SqlServer) {
+  private val context: DSLContext = sqlServer.createDSLContext()
+
+  def get(
+      workflowId: Long,
+      serializedPortId: String,
+      subdagHash: String
+  ): Option[OperatorPortCacheRecord] = {
+    context
+      .select(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH,
+        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
+        OPERATOR_PORT_CACHE.RESULT_URI,
+        OPERATOR_PORT_CACHE.TUPLE_COUNT,
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID,
+        OPERATOR_PORT_CACHE.CREATED_AT,
+        OPERATOR_PORT_CACHE.UPDATED_AT
+      )
+      .from(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId))
+      .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash))
+      .fetchOptional()
+      .toScala
+      .map { record =>
+        OperatorPortCacheRecord(
+          workflowId = record.value1().longValue(),
+          globalPortId = record.value2(),
+          subdagHash = record.value3(),
+          fingerprintJson = record.value4(),
+          resultUri = URI.create(record.value5()),
+          tupleCount = Option(record.value6()).map(_.longValue()),
+          sourceExecutionId = Option(record.value7()).map(_.longValue()),
+          createdAt = record.value8().longValue(),
+          updatedAt = record.value9().longValue()
+        )
+      }
+  }
+
+  def upsert(record: OperatorPortCacheRecord): Unit = {
+    val dbRecord = context.newRecord(OPERATOR_PORT_CACHE)
+    dbRecord.setWorkflowId(record.workflowId.toInt)
+    dbRecord.setGlobalPortId(record.globalPortId)
+    dbRecord.setSubdagHash(record.subdagHash)
+    dbRecord.setFingerprintJson(record.fingerprintJson)
+    dbRecord.setResultUri(record.resultUri.toString)
+    record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c)))
+    record.sourceExecutionId.foreach(eid => 
dbRecord.setSourceExecutionId(Long.box(eid)))
+
+    context
+      .insertInto(OPERATOR_PORT_CACHE)
+      .set(dbRecord)
+      .onConflict(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH
+      )
+      .doUpdate()
+      .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri)
+      .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson)
+      .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount)
+      .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, 
dbRecord.getSourceExecutionId)
+      .execute()
+  }
+
+  def deleteByWorkflow(workflowId: Long): Unit = {
+    context
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .execute()
+  }
+}
+```
+
+#### Step 2: Create OperatorPortCacheService
+**File**: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
+
+```scala
+package org.apache.texera.web.service
+
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
+import org.apache.texera.amber.core.workflow.{CachedOutput, 
GlobalPortIdentity, PhysicalPlan}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import org.apache.texera.web.dao.{OperatorPortCacheDao, 
OperatorPortCacheRecord}
+
+import java.net.URI
+
+class OperatorPortCacheService(dao: OperatorPortCacheDao) {
+
+  /**
+    * Lookup cached outputs for all materializable ports in the physical plan.
+    * Called at workflow submission time.
+    */
+  def lookupCachedOutputs(
+      workflowId: WorkflowIdentity,
+      physicalPlan: PhysicalPlan
+  ): Map[GlobalPortIdentity, CachedOutput] = {
+    physicalPlan.operators
+      .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, 
pid)))
+      .flatMap { gpid =>
+        val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
+        dao.get(workflowId.id, gpid.serializeAsString, 
fingerprint.subdagHash).map { record =>
+          gpid -> CachedOutput(
+            resultUri = record.resultUri,
+            fingerprintJson = record.fingerprintJson,
+            tupleCount = record.tupleCount,
+            sourceExecutionId = 
record.sourceExecutionId.map(ExecutionIdentity(_))
+          )
+        }
+      }
+      .toMap
+  }
+
+  /**
+    * Upsert cache entry when an output port completes.
+    * Called by PortCompletedHandler at runtime.
+    */
+  def upsertCachedOutput(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity,
+      portId: GlobalPortIdentity,
+      physicalPlan: PhysicalPlan,
+      resultUri: URI,
+      tupleCount: Option[Long]
+  ): Unit = {
+    val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
portId)
+    val now = System.currentTimeMillis()
+
+    dao.upsert(
+      OperatorPortCacheRecord(
+        workflowId = workflowId.id,
+        globalPortId = portId.serializeAsString,
+        subdagHash = fingerprint.subdagHash,
+        fingerprintJson = fingerprint.fingerprintJson,
+        resultUri = resultUri,
+        tupleCount = tupleCount,
+        sourceExecutionId = Some(executionId.id),
+        createdAt = now,
+        updatedAt = now
+      )
+    )
+  }
+
+  /**
+    * Invalidate all cache entries for a workflow.
+    * Useful for manual cache clearing or workflow deletion.
+    */
+  def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = {
+    dao.deleteByWorkflow(workflowId.id)
+  }
+
+  /**
+    * Future: Cost-aware eviction when storage quota is exceeded.
+    */
+  def evictLowValueEntries(quotaBytes: Long): Unit = {
+    // Phase 3: Lifecycle management
+    throw new UnsupportedOperationException("Not yet implemented")
+  }
+}
+```
+
+#### Step 3: Update WorkflowExecutionService
+**File**: 
`/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
+
+```scala
+class WorkflowExecutionService(
+    controllerConfig: ControllerConfig,
+    val workflowContext: WorkflowContext,
+    resultService: ExecutionResultService,
+    cacheService: OperatorPortCacheService,  // INJECT HERE
+    request: WorkflowExecuteRequest,
+    val executionStateStore: ExecutionStateStore,
+    errorHandler: Throwable => Unit,
+    userEmailOpt: Option[String],
+    sessionUri: URI
+) extends SubscriptionManager with LazyLogging {
+
+  def executeWorkflow(): Unit = {
+    try {
+      workflow = new 
WorkflowCompiler(workflowContext).compile(request.logicalPlan)
+
+      // Use cache service for lookup
+      val cachedOutputs = cacheService
+        .lookupCachedOutputs(workflowContext.workflowId, workflow.physicalPlan)
+        .map { case (gpid, cached) => gpid.serializeAsString -> cached }
+
+      workflowContext.workflowSettings =
+        workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs)
+    } catch {
+      case err: Throwable => errorHandler(err)
+    }
+    // ... rest of method
+  }
+}
+```
+
+#### Step 4: Update PortCompletedHandler
+**File**: 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala`
+
+```scala
+// Inject OperatorPortCacheService into controller/handler
+
+(storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match {
+  case (Some(uri), Some(plan)) =>
+    val tupleCount = try {
+      Some(DocumentFactory.openDocument(uri)._1.getCount)
+    } catch {
+      case _: Throwable => None
+    }
+
+    // Use cache service for upsert
+    cacheService.upsertCachedOutput(
+      cp.workflowContext.workflowId,
+      cp.workflowContext.executionId,
+      globalPortId,
+      plan,
+      uri,
+      tupleCount
+    )
+  case _ => // no-op
+}
+```
+
+### 7. Testing Strategy
+- **Unit tests**: Fingerprint determinism, cost model logic, region 
classification
+- **Integration tests**: Cache upsert → DB verification, cache lookup → region 
marking
+- **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip 
behavior and result correctness
+- **Change detection**: Modify operator params → verify fingerprint mismatch → 
cache miss
+
+## Current Implementation Status
+
+### Architecture Layers
+
+**Current (Prototype)**:
+```
+WorkflowExecutionService ───┐
+                            ├──→ WorkflowExecutionsResource (Jooq)
+PortCompletedHandler ───────┘
+```
+
+**Proposed (Clean Architecture)**:
+```
+WorkflowExecutionService ───┐
+                            ├──→ OperatorPortCacheService ──→ 
OperatorPortCacheDao (Jooq)
+PortCompletedHandler ───────┤
+                            │
+WorkflowExecutionsResource ─┘ (optional REST endpoints)
+```
+
+**Refactoring needed**:
+1. Extract Jooq code from `WorkflowExecutionsResource` → `OperatorPortCacheDao`
+2. Create `OperatorPortCacheService` with workflow-level abstractions
+3. Update service call sites to use `OperatorPortCacheService`
+
+### Completed Components
+- **Schema/migration**: `operator_port_cache` table added 
(`sql/texera_ddl.sql`, `sql/updates/16.sql`)
+- **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs 
for deterministic subDAG hashing
+- **Submission-time lookup**: `WorkflowExecutionService` computes fingerprints 
for all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
+- **Cache persistence**: `PortCompletedHandler` upserts to 
`operator_port_cache` on output port completion (includes fingerprint, URI, 
tuple count)
+- **API layer**: `WorkflowExecutionsResource` exposes cache lookup/upsert 
helpers
+- **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached 
when all required outputs have hits, reuses cached URIs in port configs
+- **Runtime execution**: `RegionExecutionCoordinator` branches on 
`region.cached` flag:
+  - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, 
emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs 
downstream
+  - ToExecute regions: normal execution path
+- **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct 
client updates, maintaining consistency with normal execution lifecycle
+
+### Architecture Integration
+The cache system integrates with three layers:
+1. **Execution Planning Layer**: Cache lookup at workflow submission, 
fingerprint computation
+2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, 
region classification
+3. **Runtime Layer**: Short-circuit execution for cached regions, state 
management, stats emission
+
+## Research Contributions
+
+### 1. Cost Model & Pruning (Primary Contribution)
+**Goal**: Determine when caching is beneficial and what outputs to cache.
+
+**Components**:
+- **Operator cost estimation**: Predict execution cost using historical 
runtime statistics
+- **Cache I/O cost**: Model read/write cost for materialized outputs
+- **Pruning heuristics**:
+  - Skip caching small outputs (recompute is cheap)
+  - Skip terminal operators (no downstream reuse)
+  - Skip low-reuse-probability operators
+- **Cache-or-recompute decision**: Per-region cost comparison (cache read vs 
recompute)
+
+**Current status**: Simple cost model (cached=0, execute>0). Need to develop 
historical stats-based estimation.
+
+**Data source**: `runtime_statistics` table already captures execution metrics 
(data/control processing time, tuple counts, worker counts per operator).
+
+### 2. Result Lifecycle Management (Secondary Contribution)
+**Goal**: Maintain cache correctness and efficiency over time.
+
+**Components**:
+- **Cache eviction**: When storage is limited, prioritize keeping high 
recompute-cost/storage-cost ratio entries
+- **Invalidation**: Fingerprint-based (already handles operator param changes)
+- **Source change detection**: Track external data source versions (deferred - 
hard problem)
+- **Garbage collection**: Clean up cache entries for deleted 
workflows/executions
+
+**Current status**: Assumed unlimited storage. Lifecycle management tabled for 
initial implementation.
+
+**Future considerations**:
+- Cost-aware eviction policies (LRU/LFU variants)
+- Source versioning for common sources (files, databases)
+- Cross-workflow cache sharing (global fingerprint registry)
+
+## Evaluation Plan
+
+### Benchmark Workloads
+1. **Linear pipeline**: A → B → C → D (sequential operators)
+2. **Diamond pattern**: A → B, A → C, B+C → D (shared prefix)
+3. **Multiple branches**: Complex DAGs with shared subgraphs
+4. **UDF-heavy workflows**: Expensive computation (Python/R operators)
+
+### Metrics
+- **Execution time**: Full execution vs cached re-execution
+- **Cache overhead**: Fingerprint computation + DB lookup latency
+- **Storage cost**: Cache entry sizes across workload types
+- **Hit rate**: Percentage of regions served from cache
+
+### Experiments
+1. **Baseline**: Execute workflow without caching
+2. **Cold cache**: First execution (populate cache, measure overhead)
+3. **Warm cache**: Re-execution (all hits, measure speedup)
+4. **Partial invalidation**: Modify one operator, re-execute (measure partial 
hit benefit)
+5. **Scalability**: Vary workflow size (10, 50, 100, 200 operators)
+
+### Comparison Baselines
+- Spark RDD persistence (in-memory, no cross-execution)
+- Manual materialization (user-defined intermediate saves)
+- No caching (full re-execution)
+
+## Next Steps
+
+### Phase 1: Complete Prototype (Engineering)
+
+#### 1.1 Refactor to Service/DAO Architecture
+- [ ] Create `OperatorPortCacheDao` with get/upsert/delete methods
+  - Extract Jooq code from `WorkflowExecutionsResource`
+  - Define `OperatorPortCacheRecord` case class
+  - Add unit tests for DAO operations
+- [ ] Create `OperatorPortCacheService` with high-level methods
+  - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission
+  - `upsertCachedOutput(...)`: cache write on port completion
+  - `invalidateWorkflowCache(workflowId)`: manual invalidation
+  - Encapsulate fingerprint computation and serialization
+- [ ] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service
+- [ ] Refactor `PortCompletedHandler` to use service
+- [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that 
delegate to service
+
+#### 1.2 Testing & Validation
+- [ ] Verify downstream cached URI consumption across all operator types
+- [ ] Add integration tests: cache upsert → DB verification
+- [ ] Add E2E tests: run → cache → rerun → verify skip
+- [ ] Clean up state hierarchy for cached regions (confirm shallow hierarchy)
+- [ ] Verify tuple count accuracy in cache metadata
+
+### Phase 2: Cost Model Development (Research)
+- [ ] Analyze historical runtime_statistics data for cost patterns
+- [ ] Implement cost estimation for operator execution (based on historical 
stats)
+- [ ] Implement cache I/O cost model (storage backend dependent)
+- [ ] Develop pruning heuristics (size thresholds, reuse patterns)
+- [ ] Integrate cost model with `DefaultCostEstimator`
+- [ ] Evaluate cache decisions: cost-based vs always-cache vs never-cache
+
+### Phase 3: Lifecycle Management (Research - Optional)
+- [ ] Design cost-aware eviction policy
+- [ ] Implement storage quota management
+- [ ] Add garbage collection for deleted workflows
+- [ ] Evaluate eviction policy effectiveness
+
+### Phase 4: Evaluation & Publication
+- [ ] Design and implement benchmark workloads
+- [ ] Run experiments: measure speedup, overhead, hit rates
+- [ ] Scalability analysis (varying workflow sizes)
+- [ ] Write research paper (target: SIGMOD/VLDB or workshop)
+
+## Open Research Questions
+1. **Cost model accuracy**: How well can historical stats predict future 
execution costs? Does operator heterogeneity require per-operator-type models?
+2. **Reuse patterns**: Can we predict which outputs will be reused based on 
user interaction patterns?
+3. **Source change detection**: For external data sources (files, databases), 
how to efficiently detect changes without explicit versioning?
+4. **Cross-workflow sharing**: Is global cache sharing (same subDAG across 
workflows) worth the complexity?
+5. **Incremental maintenance**: When source data changes slightly, can we 
update cache incrementally vs full invalidation?
+
+## Publication Strategy
+**Primary target**: Cost-aware caching framework for incremental workflow 
execution
+
+**Minimum viable contributions** (Workshop/Demo paper):
+- Pasta extension for cache-aware scheduling
+- Fingerprint-based correctness
+- Working prototype with evaluation
+
+**Full paper contributions** (SIGMOD/VLDB):
+- Above + cost model with formal problem definition
+- Cost-aware eviction (lifecycle management)
+- Comprehensive evaluation with multiple baselines
+
+**Alternative positioning**:
+- Demo paper: Focus on Texera integration and user experience
+- Industrial track: Deployment story with real user workloads

Reply via email to