This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 23948379b12b4f7fd0922d3905a3eb9acdd229ce Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 12 15:00:38 2026 -0800 feat(cache): add design doc. --- docs/operator-port-cache.md | 62 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md new file mode 100644 index 0000000000..5fc000c583 --- /dev/null +++ b/docs/operator-port-cache.md @@ -0,0 +1,62 @@ +# Operator Port Result Cache Design + +## Objective +Reuse previously materialized operator output ports without modifying the physical plan. The physical plan remains immutable; reuse decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata keyed by a deterministic fingerprint of the upstream sub‑DAG. + +## Data Model +- Table `operator_port_cache` (PK `(workflow_id, global_port_id, subdag_hash)`): + - `fingerprint_json`: canonical JSON of the upstream sub‑DAG. + - `subdag_hash`: SHA-256 of `fingerprint_json`. + - `result_uri`: materialization URI. + - `tuple_count` (optional), `source_execution_id` (optional), timestamps. +- `global_port_id` uses existing `GlobalPortIdentity` serialization. +- Status: schema + migration added (`sql/texera_ddl.sql`, `sql/updates/16.sql`). + +## Fingerprint +- Utility: `FingerprintUtil.computeSubdagFingerprint(physicalPlan, globalPortId) -> (fingerprintJson, subdagHash)`. +- Canonical payload (sorted): + - Target port ID. + - Upstream physical operators with exec init info (proto string) and output schemas (string form). + - Edges between those operators. +- Hash: SHA-256 of the payload JSON. + +## End-to-End Workflow +1) **Lookup before execution (mark Δ ports)** + - Compile to `PhysicalPlan`. + - For each materializable output port (internal/external), compute fingerprint. + - Query `operator_port_cache` by `(workflow_id, global_port_id, subdag_hash)`; collect hits into `cachedOutputs`. + - `WorkflowSettings` carries `cachedOutputs` keyed by serialized `GlobalPortIdentity` to avoid custom map key deserialization. + +2) **Scheduler integration (Pasta)** + - Inputs: physical plan, `cachedOutputs` (Δ), visible ports (☐). + - Classify regions: `must-execute` if they contain visible ports without cache or depend on uncached materializations; remaining regions are `cached`. + - Cost model: cached regions cost 0; executing operators >0; materialization reads/writes small fixed costs. + - Schedule marks cached vs must-execute; runtime skips cached regions and uses cached URIs for materialized reads. + +3) **Runtime behavior** + - Cached regions: skip operator execution; mark operators completed; ensure downstream readers use cached `result_uri`. + - Must-execute regions: run normally; on output port completion, compute fingerprint and upsert `operator_port_cache` with hash/fingerprint/URI (tuple count if available). + +4) **API/Helpers** + - `WorkflowExecutionsResource`: `getResultUriByPhysicalPortId`, `upsertOperatorPortCache`. + - Optional cache DAO/service wrapper for cleaner calls. + +5) **Testing** + - Fingerprint determinism/change detection. + - Cache lookup integration (insert + retrieve by hash). + - Region classification tests for Δ/☐ combinations. + - End-to-end: run → populate cache → rerun → verify cached regions are skipped and results served from cache. + +## Current Progress (checkpoint) +- Schema/migration added. +- `FingerprintUtil` implemented and covered with workflow-based specs. +- Submission-time cache lookup wired: `WorkflowExecutionService` fingerprints all physical output ports, queries cache, and stores hits in `WorkflowSettings.cachedOutputs` (keyed by serialized `GlobalPortIdentity`). +- Cache upsert on output port completion in `PortCompletedHandler` (guarded on plan + URI; tuple count best-effort). +- `WorkflowExecutionsResource` exposes lookup/upsert helpers (`getResultUriByPhysicalPortId`, `getOperatorPortCache`, `upsertOperatorPortCache`); cache maps currently use stringified port IDs to avoid Jackson map key serde issues. +- Scheduler/runtime path now reuses cached materializations: `CostBasedScheduleGenerator` marks regions cached when all required outputs have cache hits, reuses cached URIs (and cached tuple counts) in port configs, and cached regions emit stats with cached counts; `WorkflowExecutionCoordinator`/`RegionExecutionCoordinator` short-circuit cached regions, mark ports/workers completed, emit stats, and propagate cached URIs downstream. Completion notification remains aligned with the normal [...] + +## Next Actions +- Refine region classification/cost model (Δ/☐ rules) and ensure cached vs must-execute decisions align with visibility/materialization needs. +- Tighten runtime semantics: double-check downstream consumption of cached URIs and UI exposure for visible ports; consider marking worker/region states for observability. +- Refine cache upsert if needed (tuple count accuracy, avoid duplicate writes). +- Add integration and end-to-end tests for cache lookup/reuse paths and scheduler decisions.
