This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7a098977afc0587baacfe3a28eb3264bd7f2b2dc Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 19 16:17:08 2026 -0800 feat(cache): add notifications for cache invalidation and manual clear cache. --- docs/operator-port-cache.md | 3 + .../cache-panel/cache-panel.component.html | 22 ++++++ .../cache-panel/cache-panel.component.scss | 4 + .../cache-panel/cache-panel.component.ts | 91 +++++++++++++++++++++- .../context-menu/context-menu.component.ts | 53 ++++++++++++- .../compile-workflow/workflow-compiling.service.ts | 17 +++- .../workflow-cache-entries.service.ts | 71 +++++++++++++++++ 7 files changed, 252 insertions(+), 9 deletions(-) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 775d06e6e8..36c750910a 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -201,6 +201,9 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Output ports show a cached indicator when any cache entry exists for that port (no usable/not-usable distinction on the graph) - Context menu actions: "Clear cache" (selected operator) and "Clear cache up to this operator" (includes disabled operators and the selected operator) - Cache invalidation on compile: evict cache entries whose fingerprints no longer match the current workflow +- Cache panel shows a notice when auto-invalidation removes entries after a compile +- Cache panel shows a notice when users manually clear or evict cache entries (panel or context menu) +- Compile endpoint accepts HashJoin join types (e.g., "full outer") to avoid 400s during invalidation - TODO: Use source execution runtime stats for cached operator input/output counts, with fallback to `operator_port_cache.tuple_count` when stats are missing **Cache usage updates**: diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html index 785bdb1282..78db959644 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html @@ -54,6 +54,28 @@ </span> </div> + <nz-alert + *ngIf="invalidationNotice" + class="cache-panel__notice" + nzType="info" + nzShowIcon + nzCloseable + (nzOnClose)="clearInvalidationNotice()" + [nzMessage]="formatInvalidationMessage(invalidationNotice)" + [nzDescription]="formatInvalidationTimestamp(invalidationNotice)"> + </nz-alert> + + <nz-alert + *ngIf="manualClearNotice" + class="cache-panel__notice" + nzType="success" + nzShowIcon + nzCloseable + (nzOnClose)="clearManualClearNotice()" + [nzMessage]="formatManualClearMessage(manualClearNotice)" + [nzDescription]="formatManualClearTimestamp(manualClearNotice)"> + </nz-alert> + <nz-table nzSize="small" [nzSimple]="true" diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss index d6e7a58ffb..e760c340e2 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss @@ -26,6 +26,10 @@ font-size: 12px; } +.cache-panel__notice { + margin-top: 4px; +} + .cache-port__op { font-weight: 600; } diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index 967d9e298e..e8023b59c1 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -19,11 +19,15 @@ import { Component, OnInit } from "@angular/core"; import { ActivatedRoute } from "@angular/router"; -import { finalize } from "rxjs/operators"; +import { finalize, tap } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { WorkflowCacheEntry } from "../../../../dashboard/type/workflow-cache-entry"; import { CacheUsageService } from "../../../service/workflow-status/cache-usage.service"; -import { WorkflowCacheEntriesService } from "../../../service/workflow-status/workflow-cache-entries.service"; +import { + CacheInvalidationNotice, + CacheManualClearNotice, + WorkflowCacheEntriesService, +} from "../../../service/workflow-status/workflow-cache-entries.service"; /** * CachePanelComponent renders cache entry metadata for the current workflow. @@ -43,6 +47,10 @@ export class CachePanelComponent implements OnInit { /** True while the cache eviction request is in flight. */ public removing = false; public loading = false; + /** Latest auto-invalidation notice shown in the cache panel. */ + public invalidationNotice?: CacheInvalidationNotice; + /** Latest manual cache-clear notice shown in the cache panel. */ + public manualClearNotice?: CacheManualClearNotice; private workflowId?: number; private usageKeys = new Set<string>(); @@ -81,6 +89,26 @@ export class CachePanelComponent implements OnInit { ); this.updateVisibleEntries(); }); + this.cacheEntriesService + .getInvalidationNoticeStream() + .pipe(untilDestroyed(this)) + .subscribe(notice => { + if (notice && this.workflowId && notice.workflowId === this.workflowId) { + this.invalidationNotice = notice; + } else if (!notice) { + this.invalidationNotice = undefined; + } + }); + this.cacheEntriesService + .getManualClearNoticeStream() + .pipe(untilDestroyed(this)) + .subscribe(notice => { + if (notice && this.workflowId && notice.workflowId === this.workflowId) { + this.manualClearNotice = notice; + } else if (!notice) { + this.manualClearNotice = undefined; + } + }); } /** @@ -94,16 +122,28 @@ export class CachePanelComponent implements OnInit { } /** - * Removes all cached outputs for the workflow and updates shared cache state. + * Removes all cached outputs for the workflow, updates shared cache state, + * and shows a cache-clear notification on success. */ public clearCacheEntries(): void { if (!this.workflowId) { return; } + const removedCount = this.cacheEntries.length; this.removing = true; this.cacheEntriesService .clearWorkflowCacheEntries(this.workflowId) .pipe( + tap(() => { + const entryLabel = removedCount === 1 ? "entry" : "entries"; + const message = removedCount === 0 ? "Cache cleared." : `Cleared ${removedCount} cache ${entryLabel}.`; + this.cacheEntriesService.notifyManualClear({ + workflowId: this.workflowId!, + message, + removedCount, + timestamp: new Date(), + }); + }), finalize(() => { this.removing = false; }), @@ -149,4 +189,49 @@ export class CachePanelComponent implements OnInit { public shortenSubdagHash(hash: string): string { return hash.length > 8 ? hash.slice(0, 8) : hash; } + + /** + * Formats the auto-invalidation notification message. + */ + public formatInvalidationMessage(notice: CacheInvalidationNotice): string { + const entryLabel = notice.removedCount === 1 ? "entry" : "entries"; + return `Auto-removed ${notice.removedCount} cache ${entryLabel} after workflow changes.`; + } + + /** + * Formats the auto-invalidation notification timestamp for display. + */ + public formatInvalidationTimestamp(notice: CacheInvalidationNotice): string { + return `At ${notice.timestamp.toLocaleString()}`; + } + + /** + * Clears the cache invalidation notice from the panel. + */ + public clearInvalidationNotice(): void { + this.invalidationNotice = undefined; + this.cacheEntriesService.clearInvalidationNotice(); + } + + /** + * Formats the manual cache-clear notification message. + */ + public formatManualClearMessage(notice: CacheManualClearNotice): string { + return notice.message; + } + + /** + * Formats the manual cache-clear notification timestamp for display. + */ + public formatManualClearTimestamp(notice: CacheManualClearNotice): string { + return `At ${notice.timestamp.toLocaleString()}`; + } + + /** + * Clears the manual cache-clear notice from the panel. + */ + public clearManualClearNotice(): void { + this.manualClearNotice = undefined; + this.cacheEntriesService.clearManualClearNotice(); + } } diff --git a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts index 70e9236bc1..b26f41d66a 100644 --- a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts @@ -150,7 +150,7 @@ export class ContextMenuComponent { } /** - * Clears cached outputs produced by the selected operator. + * Clears cached outputs produced by the selected operator and emits a cache panel notice. */ public clearCacheForSelectedOperator(): void { const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid; @@ -158,16 +158,39 @@ export class ContextMenuComponent { return; } const operatorId = this.getSelectedOperatorID(); + const beforeKeys = new Set( + this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => this.cacheEntriesService.buildEntryKey(entry)) + ); this.workflowExecutionsService .evictWorkflowCacheEntries(workflowId, [operatorId]) .pipe(untilDestroyed(this)) .subscribe(() => { - this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => { + const afterKeys = new Set(entries.map(entry => this.cacheEntriesService.buildEntryKey(entry))); + let removedCount = 0; + beforeKeys.forEach(key => { + if (!afterKeys.has(key)) { + removedCount += 1; + } + }); + const entryLabel = removedCount === 1 ? "entry" : "entries"; + const message = + removedCount === 0 + ? "Cache cleared for selected operator." + : `Cleared ${removedCount} cache ${entryLabel} for selected operator.`; + this.cacheEntriesService.notifyManualClear({ + workflowId, + message, + removedCount, + timestamp: new Date(), + }); + }); }); } /** - * Clears cached outputs produced by the selected operator and its upstream operators. + * Clears cached outputs produced by the selected operator and its upstream operators, + * and emits a cache panel notice. */ public clearCacheUpToSelectedOperator(): void { const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid; @@ -176,11 +199,33 @@ export class ContextMenuComponent { } const operatorId = this.getSelectedOperatorID(); const upstreamOperatorIds = this.collectUpstreamOperatorIds(operatorId); + const beforeKeys = new Set( + this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => this.cacheEntriesService.buildEntryKey(entry)) + ); this.workflowExecutionsService .evictWorkflowCacheEntries(workflowId, upstreamOperatorIds) .pipe(untilDestroyed(this)) .subscribe(() => { - this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => { + const afterKeys = new Set(entries.map(entry => this.cacheEntriesService.buildEntryKey(entry))); + let removedCount = 0; + beforeKeys.forEach(key => { + if (!afterKeys.has(key)) { + removedCount += 1; + } + }); + const entryLabel = removedCount === 1 ? "entry" : "entries"; + const message = + removedCount === 0 + ? "Cache cleared for selected operator and upstream." + : `Cleared ${removedCount} cache ${entryLabel} for selected operator and upstream.`; + this.cacheEntriesService.notifyManualClear({ + workflowId, + message, + removedCount, + timestamp: new Date(), + }); + }); }); } diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts index 23efe3347f..02a5e30d93 100644 --- a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts @@ -321,7 +321,8 @@ export class WorkflowCompilingService { /** * Triggers cache invalidation after a successful compilation. - * Cache entries with mismatched fingerprints are removed on the backend. + * Cache entries with mismatched fingerprints are removed on the backend, and + * the cache panel is notified when entries are actually removed. */ private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void { const workflowId = this.workflowActionService.getWorkflowMetadata().wid; @@ -332,6 +333,9 @@ export class WorkflowCompilingService { ) { return; } + const beforeKeys = new Set( + this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => this.cacheEntriesService.buildEntryKey(entry)) + ); this.workflowExecutionsService .invalidateWorkflowCacheEntries(workflowId, logicalPlan) .pipe( @@ -341,7 +345,16 @@ export class WorkflowCompilingService { }) ) .subscribe(() => { - this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => { + const afterKeys = new Set(entries.map(entry => this.cacheEntriesService.buildEntryKey(entry))); + let removedCount = 0; + beforeKeys.forEach(key => { + if (!afterKeys.has(key)) { + removedCount += 1; + } + }); + this.cacheEntriesService.notifyInvalidation(workflowId, removedCount); + }); }); } diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts index b2f5ca0597..96731e3f92 100644 --- a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts @@ -24,6 +24,19 @@ import { WorkflowExecutionsService } from "../../../dashboard/service/user/workf import { WorkflowCacheEntry } from "../../../dashboard/type/workflow-cache-entry"; import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +export interface CacheInvalidationNotice { + workflowId: number; + removedCount: number; + timestamp: Date; +} + +export interface CacheManualClearNotice { + workflowId: number; + message: string; + removedCount?: number; + timestamp: Date; +} + /** * Shares workflow cache entry state across components and keeps it in sync with the backend. */ @@ -33,6 +46,8 @@ import { WorkflowActionService } from "../workflow-graph/model/workflow-action.s export class WorkflowCacheEntriesService { private readonly cacheEntriesSubject = new BehaviorSubject<ReadonlyArray<WorkflowCacheEntry>>([]); private readonly loadingSubject = new BehaviorSubject<boolean>(false); + private readonly invalidationNoticeSubject = new BehaviorSubject<CacheInvalidationNotice | undefined>(undefined); + private readonly manualClearNoticeSubject = new BehaviorSubject<CacheManualClearNotice | undefined>(undefined); private currentWorkflowId?: number; constructor( @@ -74,6 +89,20 @@ export class WorkflowCacheEntriesService { return this.loadingSubject.asObservable(); } + /** + * Returns a stream of cache invalidation notices triggered by compilation. + */ + public getInvalidationNoticeStream(): Observable<CacheInvalidationNotice | undefined> { + return this.invalidationNoticeSubject.asObservable(); + } + + /** + * Returns a stream of manual cache clear/eviction notices. + */ + public getManualClearNoticeStream(): Observable<CacheManualClearNotice | undefined> { + return this.manualClearNoticeSubject.asObservable(); + } + /** * Returns the latest cache entry snapshot. */ @@ -108,6 +137,48 @@ export class WorkflowCacheEntriesService { ); } + /** + * Emits a notification when auto-invalidated cache entries are removed. + */ + public notifyInvalidation(workflowId: number, removedCount: number): void { + if (removedCount <= 0) { + return; + } + this.invalidationNoticeSubject.next({ + workflowId, + removedCount, + timestamp: new Date(), + }); + } + + /** + * Clears the stored invalidation notice so it does not reappear. + */ + public clearInvalidationNotice(): void { + this.invalidationNoticeSubject.next(undefined); + } + + /** + * Emits a notification for manual cache clear or eviction actions. + */ + public notifyManualClear(notice: CacheManualClearNotice): void { + this.manualClearNoticeSubject.next(notice); + } + + /** + * Clears the stored manual cache-clear notice so it does not reappear. + */ + public clearManualClearNotice(): void { + this.manualClearNoticeSubject.next(undefined); + } + + /** + * Builds a stable key for a cache entry so invalidation diffs can be computed. + */ + public buildEntryKey(entry: WorkflowCacheEntry): string { + return `${entry.globalPortId}:${entry.subdagHash}`; + } + /** * Returns cached output port IDs for a logical operator. *
