This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 37099c2a5e80220ba207a072943bc54d1d156b1f Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 19 18:59:17 2026 -0800 feat(cache): add auto invalidation toggle and auto-update of cache entries. --- .../websocket/event/CacheEntryUpdateEvent.scala | 35 ++++++++++++++++ .../websocket/event/TexeraWebSocketEvent.scala | 1 + .../texera/web/service/ExecutionCacheService.scala | 34 ++++++++++++++-- .../web/service/WorkflowExecutionService.scala | 13 +++++- .../storage/ExecutionCacheEntryUpdateStore.scala | 30 ++++++++++++++ .../texera/web/storage/ExecutionStateStore.scala | 4 +- docs/operator-port-cache.md | 15 ++++--- .../cache-panel/cache-panel.component.html | 7 ++++ .../cache-panel/cache-panel.component.ts | 46 ++++++++++++++++++++-- .../workflow-editor/workflow-editor.component.ts | 43 +++++++++++++++++--- .../compile-workflow/workflow-compiling.service.ts | 4 ++ .../workspace/service/joint-ui/joint-ui.service.ts | 33 ++++++++++------ .../service/workflow-status/cache-usage.service.ts | 6 ++- .../workflow-cache-entries.service.ts | 43 +++++++++++++++++++- .../types/workflow-websocket.interface.ts | 8 ++++ 15 files changed, 286 insertions(+), 36 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala new file mode 100644 index 0000000000..44ac31df6a --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.model.websocket.event + +/** + * Websocket event emitted when a cached output entry is upserted. + * + * @param globalPortId Serialized GlobalPortIdentity string + * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint + * @param tupleCount Cached tuple count (optional) + * @param sourceExecutionId Execution id that produced the cached output + */ +case class CacheEntryUpdateEvent( + globalPortId: String, + subdagHash: String, + tupleCount: Option[Long], + sourceExecutionId: Long +) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala index a4953c3e1e..6a8bcf6c80 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala @@ -36,6 +36,7 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify new Type(value = classOf[ConsoleUpdateEvent]), new Type(value = classOf[CacheStatusUpdateEvent]), new Type(value = classOf[CacheUsageUpdateEvent]), + new Type(value = classOf[CacheEntryUpdateEvent]), new Type(value = classOf[PaginatedResultEvent]), new Type(value = classOf[PythonExpressionEvaluateResponse]), new Type(value = classOf[WorkerAssignmentUpdateEvent]), diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala index ee5ceeafe9..2f7d3d912a 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala @@ -20,25 +20,32 @@ package org.apache.texera.web.service import com.typesafe.scalalogging.LazyLogging -import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext} +import org.apache.texera.amber.core.workflow.cache.FingerprintUtil +import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalPlan, WorkflowContext} import org.apache.texera.amber.engine.architecture.controller.PortMaterialized import org.apache.texera.amber.engine.common.client.AmberClient +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.web.SubscriptionManager +import org.apache.texera.web.model.websocket.event.CacheEntryUpdateEvent +import org.apache.texera.web.storage.{ExecutionCacheEntryUpdateStore, ExecutionStateStore} /** * Service that listens for port materialization events from the controller - * and persists cache metadata. + * and persists cache metadata. It also emits cache entry updates so the + * frontend can refresh cache info as new cached results appear. * * @param client AmberClient to register callbacks * @param cacheService OperatorPortCacheService for cache persistence * @param workflowContext WorkflowContext for workflow/execution IDs * @param physicalPlan PhysicalPlan for fingerprint computation + * @param executionStateStore Execution state store for cache entry updates */ class ExecutionCacheService( client: AmberClient, cacheService: OperatorPortCacheService, workflowContext: WorkflowContext, - physicalPlan: PhysicalPlan + physicalPlan: PhysicalPlan, + executionStateStore: ExecutionStateStore ) extends SubscriptionManager with LazyLogging { @@ -60,6 +67,7 @@ class ExecutionCacheService( evt.resultUri, evt.tupleCount ) + emitCacheEntryUpdate(evt.portId, evt.tupleCount) } catch { case e: Throwable => logger.error(s"Failed to upsert cache for port ${evt.portId}", e) @@ -67,4 +75,24 @@ class ExecutionCacheService( }) ) } + + /** + * Emits a cache entry update after a cache upsert so websocket clients + * can refresh cache metadata for the current execution. + */ + private def emitCacheEntryUpdate( + portId: GlobalPortIdentity, + tupleCount: Option[Long] + ): Unit = { + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId) + val updateEvent = CacheEntryUpdateEvent( + globalPortId = portId.serializeAsString, + subdagHash = fingerprint.subdagHash, + tupleCount = tupleCount, + sourceExecutionId = workflowContext.executionId.id + ) + executionStateStore.cacheEntryUpdateStore.updateState(_ => + ExecutionCacheEntryUpdateStore(Some(updateEvent), System.currentTimeMillis()) + ) + } } diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 4e0e841645..9a9a8a11b9 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -95,6 +95,11 @@ class WorkflowExecutionService( Iterable(CacheUsageUpdateEvent(newState.cachedOutputs)) }) ) + addSubscription( + executionStateStore.cacheEntryUpdateStore.registerDiffHandler((_, newState) => { + newState.lastUpdate.toList + }) + ) private def createStateEvent(state: ExecutionMetadataStore): WorkflowStateEvent = { if (state.isRecovering && state.state != COMPLETED) { @@ -185,7 +190,13 @@ class WorkflowExecutionService( new ExecutionReconfigurationService(client, executionStateStore, workflow) executionStatsService = new ExecutionStatsService(client, executionStateStore, workflow.context) executionCacheService = - new ExecutionCacheService(client, cacheService, workflow.context, workflow.physicalPlan) + new ExecutionCacheService( + client, + cacheService, + workflow.context, + workflow.physicalPlan, + executionStateStore + ) executionRuntimeService = new ExecutionRuntimeService( client, executionStateStore, diff --git a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala new file mode 100644 index 0000000000..14bc324f75 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.storage + +import org.apache.texera.web.model.websocket.event.CacheEntryUpdateEvent + +/** + * Tracks the latest cache upsert so websocket clients can refresh cache metadata. + */ +case class ExecutionCacheEntryUpdateStore( + lastUpdate: Option[CacheEntryUpdateEvent] = None, + updatedAt: Long = System.currentTimeMillis() +) diff --git a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala index d307c8c021..6a6539db0e 100644 --- a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala +++ b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala @@ -56,6 +56,7 @@ class ExecutionStateStore { val breakpointStore = new StateStore(ExecutionBreakpointStore()) val reconfigurationStore = new StateStore(ExecutionReconfigurationStore()) val cacheUsageStore = new StateStore(ExecutionCacheUsageStore()) + val cacheEntryUpdateStore = new StateStore(ExecutionCacheEntryUpdateStore()) /** * Returns all state stores that should publish websocket updates for an execution. @@ -67,7 +68,8 @@ class ExecutionStateStore { breakpointStore, metadataStore, reconfigurationStore, - cacheUsageStore + cacheUsageStore, + cacheEntryUpdateStore ) } } diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 36c750910a..a98e9801c7 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -188,7 +188,7 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached **Cache metadata UI (Implemented)**: - Left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) -- Highlight cache entries usable by the current execution (fingerprint match) +- Highlight cache entries usable by the current execution (fingerprint + source execution id match) - Cache panel toggle to show only entries usable by the current execution - Cache panel action to clear all cached results (deletes cache entries, result documents, and port result records) - Output port labels show tuple counts, plus a second line with source execution id for cached outputs @@ -202,15 +202,18 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Context menu actions: "Clear cache" (selected operator) and "Clear cache up to this operator" (includes disabled operators and the selected operator) - Cache invalidation on compile: evict cache entries whose fingerprints no longer match the current workflow - Cache panel shows a notice when auto-invalidation removes entries after a compile +- Cache panel toggle to enable or disable auto invalidation after compile - Cache panel shows a notice when users manually clear or evict cache entries (panel or context menu) +- Cache panel auto-refreshes when cache entries are upserted or an execution completes - Compile endpoint accepts HashJoin join types (e.g., "full outer") to avoid 400s during invalidation - TODO: Use source execution runtime stats for cached operator input/output counts, with fallback to `operator_port_cache.tuple_count` when stats are missing **Cache usage updates**: -- `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) -- Frontend uses the event to drive cache entry highlighting and per-port cache labels +- `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint + source execution id match) +- Frontend uses the event to drive cache entry highlighting and per-port cache labels; matching includes source execution id to avoid treating new upserts as usable - Cache usage snapshots are re-emitted on websocket connect to keep labels visible after refresh +- `CacheEntryUpdateEvent` is emitted when cached outputs are upserted during execution ### 5. Service & DAO Architecture @@ -383,9 +386,9 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached regions in `workflow-editor.component.ts` - Region visibility: shared state via `WorkflowActionService.showRegion` ensures correct visibility when regions are created during execution - **Cache metadata UI** (Phase 1.3 - Complete): - - `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) + - `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint + source execution id match) - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) - - Cache entries highlight when usable by the current execution (fingerprint match) + - Cache entries highlight when usable by the current execution (fingerprint + source execution id match) - Cache panel "Clear cache" action removes cached results and associated result artifacts - Cache panel can filter to only show entries usable by the current execution - Cached output ports show source execution id on a second label line @@ -528,7 +531,7 @@ The cache system integrates with three layers: #### 1.3 Cache Metadata UI ✓ COMPLETE - [X] Add left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) -- [x] Highlight cache entries usable by the current execution (fingerprint match) +- [x] Highlight cache entries usable by the current execution (fingerprint + source execution id match) - [X] Show per-output-port sourceExecutionId on a second output port label line - [X] Re-emit cache usage snapshots on websocket connect to refresh cache labels after reload - [X] Keep result URI hidden in the UI diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html index 78db959644..7d0bedb6c0 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html @@ -38,6 +38,13 @@ nzPopconfirmTitle="Remove all cached results for this workflow?"> Clear cache </button> + <button + nz-button + nzSize="small" + nzType="default" + (click)="toggleAutoInvalidation()"> + Auto invalidation: {{ autoInvalidationEnabled ? 'On' : 'Off' }} + </button> <label class="cache-panel__toggle"> <nz-switch nzSize="small" diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index e8023b59c1..f44d48f6b9 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -22,7 +22,9 @@ import { ActivatedRoute } from "@angular/router"; import { finalize, tap } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { WorkflowCacheEntry } from "../../../../dashboard/type/workflow-cache-entry"; +import { ExecuteWorkflowService } from "../../../service/execute-workflow/execute-workflow.service"; import { CacheUsageService } from "../../../service/workflow-status/cache-usage.service"; +import { ExecutionState } from "../../../types/execute-workflow.interface"; import { CacheInvalidationNotice, CacheManualClearNotice, @@ -44,6 +46,8 @@ export class CachePanelComponent implements OnInit { public visibleEntries: WorkflowCacheEntry[] = []; /** When true, only cache entries usable by the current execution are shown. */ public showUsableOnly = false; + /** When true, auto invalidation runs after successful compilation. */ + public autoInvalidationEnabled = true; /** True while the cache eviction request is in flight. */ public removing = false; public loading = false; @@ -57,6 +61,7 @@ export class CachePanelComponent implements OnInit { constructor( private cacheEntriesService: WorkflowCacheEntriesService, private cacheUsageService: CacheUsageService, + private executeWorkflowService: ExecuteWorkflowService, private route: ActivatedRoute ) {} @@ -85,7 +90,13 @@ export class CachePanelComponent implements OnInit { .pipe(untilDestroyed(this)) .subscribe(entries => { this.usageKeys = new Set( - entries.map(entry => this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)) + entries.map(entry => + this.cacheUsageService.buildUsageKey( + entry.globalPortId, + entry.subdagHash, + entry.sourceExecutionId + ) + ) ); this.updateVisibleEntries(); }); @@ -109,6 +120,20 @@ export class CachePanelComponent implements OnInit { this.manualClearNotice = undefined; } }); + this.cacheEntriesService + .getAutoInvalidationEnabledStream() + .pipe(untilDestroyed(this)) + .subscribe(enabled => { + this.autoInvalidationEnabled = enabled; + }); + this.executeWorkflowService + .getExecutionStateStream() + .pipe(untilDestroyed(this)) + .subscribe(({ current }) => { + if (current.state === ExecutionState.Completed) { + this.refresh(); + } + }); } /** @@ -153,11 +178,17 @@ export class CachePanelComponent implements OnInit { } /** - * Returns true when a cache entry is usable by the current execution (fingerprint match), - * regardless of whether the scheduler chooses to reuse it. + * Returns true when a cache entry is usable by the current execution + * (fingerprint and source execution match), regardless of whether the scheduler chooses to reuse it. */ public isUsableForExecution(entry: WorkflowCacheEntry): boolean { - return this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)); + return this.usageKeys.has( + this.cacheUsageService.buildUsageKey( + entry.globalPortId, + entry.subdagHash, + entry.sourceExecutionId + ) + ); } /** @@ -169,6 +200,13 @@ export class CachePanelComponent implements OnInit { : this.cacheEntries; } + /** + * Toggles auto invalidation of cached results after compilation. + */ + public toggleAutoInvalidation(): void { + this.cacheEntriesService.toggleAutoInvalidation(); + } + /** * Formats tuple counts for display. */ diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index cc0126df69..1d8319b596 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -139,7 +139,30 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy magnet: SVGElement, event: joint.dia.Event ): boolean { - return magnet && magnet.getAttribute("port-group") === "out"; + const portGroup = WorkflowEditorComponent.getMagnetAttribute(magnet, "port-group"); + return portGroup === "out"; + } + + /** + * Resolves a port attribute from a magnet or its nearest port element. + * This keeps port interactions working even when ports have extra markup. + */ + private static getMagnetAttribute( + magnet: SVGElement | null | undefined, + attribute: "port" | "port-group" + ): string | null { + if (!magnet) { + return null; + } + const direct = magnet.getAttribute(attribute); + if (direct) { + return direct; + } + const closest = magnet.closest(`[${attribute}]`); + if (closest instanceof SVGElement) { + return closest.getAttribute(attribute); + } + return null; } ngAfterViewInit() { @@ -924,9 +947,14 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy // set the multi-select mode this.wrapper.setMultiSelectMode(<boolean>event[1].shiftKey); + const portID = WorkflowEditorComponent.getMagnetAttribute(event[2] as SVGElement, "port"); + if (!portID) { + return; + } + const clickedPortID: LogicalPort = { operatorID: event[0].model.id as string, - portID: event[2].getAttribute("port") as string, + portID: portID, }; if (event[1].shiftKey) { @@ -1038,20 +1066,23 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy end: joint.dia.LinkEnd, linkView: joint.dia.LinkView ): boolean { + const sourcePortGroup = WorkflowEditorComponent.getMagnetAttribute(sourceMagnet, "port-group"); + const targetPortGroup = WorkflowEditorComponent.getMagnetAttribute(targetMagnet, "port-group"); + // user cannot draw connection starting from the input port (left side) - if (sourceMagnet && sourceMagnet.getAttribute("port-group") === "in") { + if (sourcePortGroup === "in") { return false; } // user cannot connect to the output port (right side) - if (targetMagnet && targetMagnet.getAttribute("port-group") === "out") { + if (targetPortGroup === "out") { return false; } const sourceCellID = sourceView.model.id.toString(); - const sourcePortID = sourceMagnet?.getAttribute("port"); + const sourcePortID = WorkflowEditorComponent.getMagnetAttribute(sourceMagnet, "port"); const targetCellID = targetView.model.id.toString(); - const targetPortID = targetMagnet?.getAttribute("port"); + const targetPortID = WorkflowEditorComponent.getMagnetAttribute(targetMagnet, "port"); return this.validateOperatorConnection(sourceCellID, sourcePortID, targetCellID, targetPortID); } diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts index 02a5e30d93..d2beccb2ee 100644 --- a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts @@ -323,6 +323,7 @@ export class WorkflowCompilingService { * Triggers cache invalidation after a successful compilation. * Cache entries with mismatched fingerprints are removed on the backend, and * the cache panel is notified when entries are actually removed. + * This is skipped when auto invalidation is disabled in the cache panel. */ private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void { const workflowId = this.workflowActionService.getWorkflowMetadata().wid; @@ -333,6 +334,9 @@ export class WorkflowCompilingService { ) { return; } + if (!this.cacheEntriesService.isAutoInvalidationEnabled()) { + return; + } const beforeKeys = new Set( this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => this.cacheEntriesService.buildEntryKey(entry)) ); diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 7f098a3e10..d93110ebf2 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -427,7 +427,7 @@ export class JointUIService { } /** - * Updates cached output port indicator rings without changing counts or labels. + * Updates cached output port indicator badges without changing counts or labels. */ public changeOperatorCachedPorts( jointPaper: joint.dia.Paper, @@ -692,7 +692,8 @@ export class JointUIService { /** * This function changes the default svg of the operator ports. * It hides the port label that will display 'out/in' beside the operators. - * Port labels remain visible for per-port metrics and cache metadata. + * Port labels remain visible for per-port metrics and cache metadata, and cached + * output ports show a small badge that never captures pointer events so ports stay interactive. * * @returns the custom attributes of the ports */ @@ -702,13 +703,15 @@ export class JointUIService { fill: "#A0A0A0", r: 5, stroke: "none", + "pointer-events": "all", }, ".port-cache-indicator": { - fill: "none", - r: 7, - stroke: "#1890ff", - "stroke-width": 1.5, + fill: "#fadb14", + points: "0,-12 4,-9 0,-6 -4,-9", + stroke: "#ad8b00", + "stroke-width": 1, display: "none", + "pointer-events": "none", }, ".port-label": { visibility: "visible", @@ -718,27 +721,31 @@ export class JointUIService { ref: ".port-body", "ref-y": 0.5, "y-alignment": "middle", + "pointer-events": "none", }, }; } /** - * Defines the default markup for ports. + * Defines the default markup for ports, including the cache badge and port body. */ public static getCustomPortMarkup(): any[] { return [ { tagName: "circle", - selector: ".port-cache-indicator", + selector: ".port-body", attributes: { - class: "port-cache-indicator", + class: "port-body", + magnet: true, }, }, { - tagName: "circle", - selector: ".port-body", + tagName: "polygon", + selector: ".port-cache-indicator", attributes: { - class: "port-body", + class: "port-cache-indicator", + magnet: false, + "pointer-events": "none", }, }, ]; @@ -754,6 +761,7 @@ export class JointUIService { selector: ".port-label", attributes: { class: "port-label", + "pointer-events": "none", }, }, { @@ -761,6 +769,7 @@ export class JointUIService { selector: ".port-cache-label", attributes: { class: "port-cache-label", + "pointer-events": "none", }, }, ]; diff --git a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts index 59f10e8d54..99c89ce848 100644 --- a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts @@ -68,9 +68,11 @@ export class CacheUsageService { /** * Builds a stable key to match cache entries against cache usage updates. + * Includes the source execution to avoid tagging replaced cache entries as usable. */ - public buildUsageKey(globalPortId: string, subdagHash: string): string { - return `${globalPortId}|${subdagHash}`; + public buildUsageKey(globalPortId: string, subdagHash: string, sourceExecutionId?: number): string { + const executionToken = sourceExecutionId == null ? "unknown" : sourceExecutionId.toString(); + return `${globalPortId}|${subdagHash}|${executionToken}`; } private registerCacheUsageListener(): void { diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts index 96731e3f92..852f9fddfb 100644 --- a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts @@ -19,9 +19,10 @@ import { Injectable } from "@angular/core"; import { BehaviorSubject, Observable } from "rxjs"; -import { finalize, tap } from "rxjs/operators"; +import { auditTime, finalize, tap } from "rxjs/operators"; import { WorkflowExecutionsService } from "../../../dashboard/service/user/workflow-executions/workflow-executions.service"; import { WorkflowCacheEntry } from "../../../dashboard/type/workflow-cache-entry"; +import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; export interface CacheInvalidationNotice { @@ -48,10 +49,13 @@ export class WorkflowCacheEntriesService { private readonly loadingSubject = new BehaviorSubject<boolean>(false); private readonly invalidationNoticeSubject = new BehaviorSubject<CacheInvalidationNotice | undefined>(undefined); private readonly manualClearNoticeSubject = new BehaviorSubject<CacheManualClearNotice | undefined>(undefined); + /** Whether compile-time cache invalidation should run automatically. */ + private readonly autoInvalidationEnabledSubject = new BehaviorSubject<boolean>(true); private currentWorkflowId?: number; constructor( private workflowExecutionsService: WorkflowExecutionsService, + private workflowWebsocketService: WorkflowWebsocketService, private workflowActionService: WorkflowActionService ) { this.workflowActionService.workflowMetaDataChanged().subscribe(metadata => { @@ -73,6 +77,15 @@ export class WorkflowCacheEntriesService { this.currentWorkflowId = initialWorkflowId; this.refreshCacheEntries(initialWorkflowId).subscribe(); } + + this.workflowWebsocketService + .subscribeToEvent("CacheEntryUpdateEvent") + .pipe(auditTime(250)) + .subscribe(() => { + if (this.currentWorkflowId && this.currentWorkflowId > 0) { + this.refreshCacheEntries(this.currentWorkflowId).subscribe(); + } + }); } /** @@ -103,6 +116,13 @@ export class WorkflowCacheEntriesService { return this.manualClearNoticeSubject.asObservable(); } + /** + * Returns a stream of the auto-invalidation toggle state. + */ + public getAutoInvalidationEnabledStream(): Observable<boolean> { + return this.autoInvalidationEnabledSubject.asObservable(); + } + /** * Returns the latest cache entry snapshot. */ @@ -110,6 +130,27 @@ export class WorkflowCacheEntriesService { return this.cacheEntriesSubject.value; } + /** + * Returns true when auto invalidation on compile is enabled. + */ + public isAutoInvalidationEnabled(): boolean { + return this.autoInvalidationEnabledSubject.value; + } + + /** + * Updates the auto-invalidation toggle state. + */ + public setAutoInvalidationEnabled(enabled: boolean): void { + this.autoInvalidationEnabledSubject.next(enabled); + } + + /** + * Flips the auto-invalidation toggle state. + */ + public toggleAutoInvalidation(): void { + this.setAutoInvalidationEnabled(!this.autoInvalidationEnabledSubject.value); + } + /** * Refreshes cache entries for a workflow and updates shared state. * diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index e22633f213..a6c953e640 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -157,6 +157,13 @@ export interface CacheUsageUpdateEvent cachedOutputs: ReadonlyArray<CachedPortUsage>; }> {} +export type CacheEntryUpdateEvent = Readonly<{ + globalPortId: string; + subdagHash: string; + tupleCount?: number; + sourceExecutionId: number; +}>; + export type PythonExpressionEvaluateRequest = Readonly<{ expression: string; operatorId: string; @@ -252,6 +259,7 @@ export type TexeraWebsocketEventTypeMap = { WorkflowAvailableResultEvent: WorkflowAvailableResultEvent; CacheStatusUpdateEvent: CacheStatusUpdateEvent; CacheUsageUpdateEvent: CacheUsageUpdateEvent; + CacheEntryUpdateEvent: CacheEntryUpdateEvent; PythonExpressionEvaluateResponse: PythonExpressionEvaluateResponse; WorkerAssignmentUpdateEvent: WorkerAssignmentUpdateEvent; ModifyLogicResponse: ModifyLogicResponse;
