This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit a8057585fe9606ec24beee82c36d21641b54dc32 Author: Xiaozhen Liu <[email protected]> AuthorDate: Wed Jan 14 14:08:50 2026 -0800 feat(cache): update doc and number display for operators completing from cache. --- .../scheduling/RegionExecutionCoordinator.scala | 17 +++++---- docs/operator-port-cache.md | 40 ++++++++++++++++++++-- .../workspace/service/joint-ui/joint-ui.service.ts | 11 +++--- 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 9f7fe80adb..cc409ea098 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -137,15 +137,14 @@ class RegionExecutionCoordinator( region.getOperators.foreach { op => val opExecution = regionExecution.initOperatorExecution(op.id) // Cached regions do not create workers; synthesize operator-level metrics instead. - val outputMetrics = op.outputPorts.keys.map { pid => - val count = resourceConfig.portConfigs - .collectFirst { - case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => - cfg.cachedTupleCount.getOrElse(0L) - } - .getOrElse(0L) - PortTupleMetricsMapping(pid, TupleMetrics(count, 0L)) - }.toSeq + val outputMetrics = resourceConfig.portConfigs + .collect { + case (gpid, cfg: OutputPortConfig) if gpid.opId == op.id => + // Only emit metrics for materialized outputs; UI treats missing ports as skipped. + val count = cfg.cachedTupleCount.getOrElse(0L) + PortTupleMetricsMapping(gpid.portId, TupleMetrics(count, 0L)) + } + .toSeq val inputMetrics = op.inputPorts.keys .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) .toSeq diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 940d0490fa..8fab7a1a59 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -40,8 +40,8 @@ ToSkip regions create lightweight state structures (Workflow → Region → Oper ### 5. Stats Emission via Direct Client Updates Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). This reuses existing stats infrastructure without special-casing the frontend. -### 6. No Explicit Cache Flag in Metrics -Cached execution is inferred from `numWorkers=0` + instant completion, rather than adding a `from_cache` flag to protobuf messages. This minimizes protocol changes. +### 6. Explicit Cache State in Metrics +Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in `WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to users and distinguishes cache-hit completion from normal execution completion. ### 7. Deferred Lifecycle Management V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research topics for future work, not implementation blockers. @@ -119,6 +119,8 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - `numWorkers = 0` - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0` - Input/output tuple counts from cached metadata + - **UI**: The graph view displays `-` for cached input counts and for cached output ports that were not materialized. + - **Note**: Cached stats are synthetic (inputs default to 0; non-materialized outputs may be omitted). Do not use them for cost modeling until we add explicit tagging/filtering in `runtime_statistics`. 5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` for materialized inputs 6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment events (consistent with numWorkers=0) 7. **Set phase to Completed**: Region lifecycle completes immediately @@ -292,6 +294,16 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - ToSkip regions: `completeCachedRegion()` records cached operator metrics (numWorkers=0, processingTime=0) without creating workers, propagates cached URIs downstream - ToExecute regions: normal execution path - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct client updates, maintaining consistency with normal execution lifecycle +- **Frontend cache visualization** (Phase 1.2 - Complete): + - Added `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum + - Added `CompletedFromCache` phase to `RegionExecutionCoordinator` for cached region lifecycle + - Backend state conversion in `Utils.scala`: `aggregatedStateToString`, `stringToAggregatedState`, `maptoStatusCode` + - State aggregation in `ExecutionUtils.aggregateStates()` handles `COMPLETED_FROM_CACHE` as terminal state + - Frontend `OperatorState` enum includes `CompletedFromCache` + - Operator visualization: blue fill color (`#1890ff`) for cached operators in `joint-ui.service.ts` + - Port metrics display: cached input counts show `-`, and cached output ports without materialization show `-` + - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached regions in `workflow-editor.component.ts` + - Region visibility: shared state via `WorkflowActionService.showRegion` ensures correct visibility when regions are created during execution ### Architecture Integration The cache system integrates with three layers: @@ -390,7 +402,29 @@ The cache system integrates with three layers: - [ ] Add unit tests for DAO operations - [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that delegate to service -#### 1.2 Testing & Validation +#### 1.2 Frontend Cache Visualization ✓ COMPLETE +- [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum + - Location: `/amber/src/main/protobuf/.../controlreturns.proto` +- [x] Add `CompletedFromCache` phase to `RegionExecutionCoordinator` + - Cached regions use this phase instead of `Completed` + - Location: `/amber/src/main/scala/.../scheduling/RegionExecutionCoordinator.scala` +- [x] Update backend state conversion functions in `Utils.scala` + - `aggregatedStateToString`: `COMPLETED_FROM_CACHE` → `"CompletedFromCache"` + - `stringToAggregatedState`: `"completedfromcache"` → `COMPLETED_FROM_CACHE` + - `maptoStatusCode`: `COMPLETED_FROM_CACHE` → `6` +- [x] Update `ExecutionUtils.aggregateStates()` to handle `COMPLETED_FROM_CACHE` + - Treated as terminal state alongside `COMPLETED` and `TERMINATED` +- [x] Add `CompletedFromCache` to frontend `OperatorState` enum + - Location: `/frontend/src/app/workspace/types/execute-workflow.interface.ts` +- [x] Add blue fill color for cached operators in `joint-ui.service.ts` + - Color: `#1890ff` (Ant Design blue) +- [x] Add blue fill for cached regions in `workflow-editor.component.ts` + - Color: `rgba(24,144,255,0.3)` (translucent blue) +- [x] Show `-` for cached input counts and cached output ports without materialization +- [x] Fix region visibility with shared state via `WorkflowActionService.showRegion` + - Ensures regions show correctly when user toggles visibility before execution + +#### 1.3 Testing & Validation - [ ] Verify downstream cached URI consumption across all operator types - [ ] Add integration tests: cache upsert → DB verification - [ ] Add E2E tests: run → cache → rerun → verify skip diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 4f9b984b4f..a07606611f 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -320,6 +320,8 @@ export class JointUIService { const inputMetrics = statistics.inputPortMetrics; const outputMetrics = statistics.outputPortMetrics; + // Cached operators show "-" for inputs and non-materialized outputs. + const isSkippedFromCache = statistics.operatorState === OperatorState.CompletedFromCache; const workerCount = statistics.numWorkers ?? 1; element.attr(`.${operatorWorkerCountClass}/text`, "#workers: " + String(workerCount)); @@ -330,7 +332,7 @@ export class JointUIService { const parts = portId.split("-"); const numericSuffix = parts.length > 1 ? parts[1] : portId; - const count: number = inputMetrics[numericSuffix] ?? 0; + const count = inputMetrics[numericSuffix]; const rawAttrs = (portDef.attrs as any) || {}; const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText; @@ -339,7 +341,7 @@ export class JointUIService { originalName = portId; } - const labelText = `${count}`; + const labelText = isSkippedFromCache ? "-" : String(count ?? 0); element.portProp(portId, "attrs/.port-label/text", labelText); } }); @@ -350,7 +352,7 @@ export class JointUIService { const parts = portId.split("-"); const numericSuffix = parts.length > 1 ? parts[1] : portId; - const count: number = outputMetrics[numericSuffix] ?? 0; + const count = outputMetrics[numericSuffix]; const rawAttrs = (portDef.attrs as any) || {}; const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText; @@ -359,7 +361,8 @@ export class JointUIService { originalName = portId; } - const labelText = `${count}`; + const labelText = + isSkippedFromCache && count === undefined ? "-" : String(count ?? 0); element.portProp(portId, "attrs/.port-label/text", labelText); }
