This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 7a098977afc0587baacfe3a28eb3264bd7f2b2dc
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 19 16:17:08 2026 -0800

    feat(cache): add notifications for cache invalidation and manual clear 
cache.
---
 docs/operator-port-cache.md                        |  3 +
 .../cache-panel/cache-panel.component.html         | 22 ++++++
 .../cache-panel/cache-panel.component.scss         |  4 +
 .../cache-panel/cache-panel.component.ts           | 91 +++++++++++++++++++++-
 .../context-menu/context-menu.component.ts         | 53 ++++++++++++-
 .../compile-workflow/workflow-compiling.service.ts | 17 +++-
 .../workflow-cache-entries.service.ts              | 71 +++++++++++++++++
 7 files changed, 252 insertions(+), 9 deletions(-)

diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 775d06e6e8..36c750910a 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -201,6 +201,9 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 - Output ports show a cached indicator when any cache entry exists for that 
port (no usable/not-usable distinction on the graph)
 - Context menu actions: "Clear cache" (selected operator) and "Clear cache up 
to this operator" (includes disabled operators and the selected operator)
 - Cache invalidation on compile: evict cache entries whose fingerprints no 
longer match the current workflow
+- Cache panel shows a notice when auto-invalidation removes entries after a 
compile
+- Cache panel shows a notice when users manually clear or evict cache entries 
(panel or context menu)
+- Compile endpoint accepts HashJoin join types (e.g., "full outer") to avoid 
400s during invalidation
 - TODO: Use source execution runtime stats for cached operator input/output 
counts, with fallback to `operator_port_cache.tuple_count` when stats are 
missing
 
 **Cache usage updates**:
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
index 785bdb1282..78db959644 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
@@ -54,6 +54,28 @@
     </span>
   </div>
 
+  <nz-alert
+    *ngIf="invalidationNotice"
+    class="cache-panel__notice"
+    nzType="info"
+    nzShowIcon
+    nzCloseable
+    (nzOnClose)="clearInvalidationNotice()"
+    [nzMessage]="formatInvalidationMessage(invalidationNotice)"
+    [nzDescription]="formatInvalidationTimestamp(invalidationNotice)">
+  </nz-alert>
+
+  <nz-alert
+    *ngIf="manualClearNotice"
+    class="cache-panel__notice"
+    nzType="success"
+    nzShowIcon
+    nzCloseable
+    (nzOnClose)="clearManualClearNotice()"
+    [nzMessage]="formatManualClearMessage(manualClearNotice)"
+    [nzDescription]="formatManualClearTimestamp(manualClearNotice)">
+  </nz-alert>
+
   <nz-table
     nzSize="small"
     [nzSimple]="true"
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
index d6e7a58ffb..e760c340e2 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
@@ -26,6 +26,10 @@
   font-size: 12px;
 }
 
+.cache-panel__notice {
+  margin-top: 4px;
+}
+
 .cache-port__op {
   font-weight: 600;
 }
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index 967d9e298e..e8023b59c1 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -19,11 +19,15 @@
 
 import { Component, OnInit } from "@angular/core";
 import { ActivatedRoute } from "@angular/router";
-import { finalize } from "rxjs/operators";
+import { finalize, tap } from "rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowCacheEntry } from 
"../../../../dashboard/type/workflow-cache-entry";
 import { CacheUsageService } from 
"../../../service/workflow-status/cache-usage.service";
-import { WorkflowCacheEntriesService } from 
"../../../service/workflow-status/workflow-cache-entries.service";
+import {
+  CacheInvalidationNotice,
+  CacheManualClearNotice,
+  WorkflowCacheEntriesService,
+} from "../../../service/workflow-status/workflow-cache-entries.service";
 
 /**
  * CachePanelComponent renders cache entry metadata for the current workflow.
@@ -43,6 +47,10 @@ export class CachePanelComponent implements OnInit {
   /** True while the cache eviction request is in flight. */
   public removing = false;
   public loading = false;
+  /** Latest auto-invalidation notice shown in the cache panel. */
+  public invalidationNotice?: CacheInvalidationNotice;
+  /** Latest manual cache-clear notice shown in the cache panel. */
+  public manualClearNotice?: CacheManualClearNotice;
   private workflowId?: number;
   private usageKeys = new Set<string>();
 
@@ -81,6 +89,26 @@ export class CachePanelComponent implements OnInit {
         );
         this.updateVisibleEntries();
       });
+    this.cacheEntriesService
+      .getInvalidationNoticeStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(notice => {
+        if (notice && this.workflowId && notice.workflowId === 
this.workflowId) {
+          this.invalidationNotice = notice;
+        } else if (!notice) {
+          this.invalidationNotice = undefined;
+        }
+      });
+    this.cacheEntriesService
+      .getManualClearNoticeStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(notice => {
+        if (notice && this.workflowId && notice.workflowId === 
this.workflowId) {
+          this.manualClearNotice = notice;
+        } else if (!notice) {
+          this.manualClearNotice = undefined;
+        }
+      });
   }
 
   /**
@@ -94,16 +122,28 @@ export class CachePanelComponent implements OnInit {
   }
 
   /**
-   * Removes all cached outputs for the workflow and updates shared cache 
state.
+   * Removes all cached outputs for the workflow, updates shared cache state,
+   * and shows a cache-clear notification on success.
    */
   public clearCacheEntries(): void {
     if (!this.workflowId) {
       return;
     }
+    const removedCount = this.cacheEntries.length;
     this.removing = true;
     this.cacheEntriesService
       .clearWorkflowCacheEntries(this.workflowId)
       .pipe(
+        tap(() => {
+          const entryLabel = removedCount === 1 ? "entry" : "entries";
+          const message = removedCount === 0 ? "Cache cleared." : `Cleared 
${removedCount} cache ${entryLabel}.`;
+          this.cacheEntriesService.notifyManualClear({
+            workflowId: this.workflowId!,
+            message,
+            removedCount,
+            timestamp: new Date(),
+          });
+        }),
         finalize(() => {
           this.removing = false;
         }),
@@ -149,4 +189,49 @@ export class CachePanelComponent implements OnInit {
   public shortenSubdagHash(hash: string): string {
     return hash.length > 8 ? hash.slice(0, 8) : hash;
   }
+
+  /**
+   * Formats the auto-invalidation notification message.
+   */
+  public formatInvalidationMessage(notice: CacheInvalidationNotice): string {
+    const entryLabel = notice.removedCount === 1 ? "entry" : "entries";
+    return `Auto-removed ${notice.removedCount} cache ${entryLabel} after 
workflow changes.`;
+  }
+
+  /**
+   * Formats the auto-invalidation notification timestamp for display.
+   */
+  public formatInvalidationTimestamp(notice: CacheInvalidationNotice): string {
+    return `At ${notice.timestamp.toLocaleString()}`;
+  }
+
+  /**
+   * Clears the cache invalidation notice from the panel.
+   */
+  public clearInvalidationNotice(): void {
+    this.invalidationNotice = undefined;
+    this.cacheEntriesService.clearInvalidationNotice();
+  }
+
+  /**
+   * Formats the manual cache-clear notification message.
+   */
+  public formatManualClearMessage(notice: CacheManualClearNotice): string {
+    return notice.message;
+  }
+
+  /**
+   * Formats the manual cache-clear notification timestamp for display.
+   */
+  public formatManualClearTimestamp(notice: CacheManualClearNotice): string {
+    return `At ${notice.timestamp.toLocaleString()}`;
+  }
+
+  /**
+   * Clears the manual cache-clear notice from the panel.
+   */
+  public clearManualClearNotice(): void {
+    this.manualClearNotice = undefined;
+    this.cacheEntriesService.clearManualClearNotice();
+  }
 }
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
index 70e9236bc1..b26f41d66a 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
@@ -150,7 +150,7 @@ export class ContextMenuComponent {
   }
 
   /**
-   * Clears cached outputs produced by the selected operator.
+   * Clears cached outputs produced by the selected operator and emits a cache 
panel notice.
    */
   public clearCacheForSelectedOperator(): void {
     const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid;
@@ -158,16 +158,39 @@ export class ContextMenuComponent {
       return;
     }
     const operatorId = this.getSelectedOperatorID();
+    const beforeKeys = new Set(
+      this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => 
this.cacheEntriesService.buildEntryKey(entry))
+    );
     this.workflowExecutionsService
       .evictWorkflowCacheEntries(workflowId, [operatorId])
       .pipe(untilDestroyed(this))
       .subscribe(() => {
-        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+        
this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => {
+          const afterKeys = new Set(entries.map(entry => 
this.cacheEntriesService.buildEntryKey(entry)));
+          let removedCount = 0;
+          beforeKeys.forEach(key => {
+            if (!afterKeys.has(key)) {
+              removedCount += 1;
+            }
+          });
+          const entryLabel = removedCount === 1 ? "entry" : "entries";
+          const message =
+            removedCount === 0
+              ? "Cache cleared for selected operator."
+              : `Cleared ${removedCount} cache ${entryLabel} for selected 
operator.`;
+          this.cacheEntriesService.notifyManualClear({
+            workflowId,
+            message,
+            removedCount,
+            timestamp: new Date(),
+          });
+        });
       });
   }
 
   /**
-   * Clears cached outputs produced by the selected operator and its upstream 
operators.
+   * Clears cached outputs produced by the selected operator and its upstream 
operators,
+   * and emits a cache panel notice.
    */
   public clearCacheUpToSelectedOperator(): void {
     const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid;
@@ -176,11 +199,33 @@ export class ContextMenuComponent {
     }
     const operatorId = this.getSelectedOperatorID();
     const upstreamOperatorIds = this.collectUpstreamOperatorIds(operatorId);
+    const beforeKeys = new Set(
+      this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => 
this.cacheEntriesService.buildEntryKey(entry))
+    );
     this.workflowExecutionsService
       .evictWorkflowCacheEntries(workflowId, upstreamOperatorIds)
       .pipe(untilDestroyed(this))
       .subscribe(() => {
-        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+        
this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => {
+          const afterKeys = new Set(entries.map(entry => 
this.cacheEntriesService.buildEntryKey(entry)));
+          let removedCount = 0;
+          beforeKeys.forEach(key => {
+            if (!afterKeys.has(key)) {
+              removedCount += 1;
+            }
+          });
+          const entryLabel = removedCount === 1 ? "entry" : "entries";
+          const message =
+            removedCount === 0
+              ? "Cache cleared for selected operator and upstream."
+              : `Cleared ${removedCount} cache ${entryLabel} for selected 
operator and upstream.`;
+          this.cacheEntriesService.notifyManualClear({
+            workflowId,
+            message,
+            removedCount,
+            timestamp: new Date(),
+          });
+        });
       });
   }
 
diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index 23efe3347f..02a5e30d93 100644
--- 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -321,7 +321,8 @@ export class WorkflowCompilingService {
 
   /**
    * Triggers cache invalidation after a successful compilation.
-   * Cache entries with mismatched fingerprints are removed on the backend.
+   * Cache entries with mismatched fingerprints are removed on the backend, and
+   * the cache panel is notified when entries are actually removed.
    */
   private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void {
     const workflowId = this.workflowActionService.getWorkflowMetadata().wid;
@@ -332,6 +333,9 @@ export class WorkflowCompilingService {
     ) {
       return;
     }
+    const beforeKeys = new Set(
+      this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => 
this.cacheEntriesService.buildEntryKey(entry))
+    );
     this.workflowExecutionsService
       .invalidateWorkflowCacheEntries(workflowId, logicalPlan)
       .pipe(
@@ -341,7 +345,16 @@ export class WorkflowCompilingService {
         })
       )
       .subscribe(() => {
-        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+        
this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => {
+          const afterKeys = new Set(entries.map(entry => 
this.cacheEntriesService.buildEntryKey(entry)));
+          let removedCount = 0;
+          beforeKeys.forEach(key => {
+            if (!afterKeys.has(key)) {
+              removedCount += 1;
+            }
+          });
+          this.cacheEntriesService.notifyInvalidation(workflowId, 
removedCount);
+        });
       });
   }
 
diff --git 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
index b2f5ca0597..96731e3f92 100644
--- 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
@@ -24,6 +24,19 @@ import { WorkflowExecutionsService } from 
"../../../dashboard/service/user/workf
 import { WorkflowCacheEntry } from 
"../../../dashboard/type/workflow-cache-entry";
 import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
 
+export interface CacheInvalidationNotice {
+  workflowId: number;
+  removedCount: number;
+  timestamp: Date;
+}
+
+export interface CacheManualClearNotice {
+  workflowId: number;
+  message: string;
+  removedCount?: number;
+  timestamp: Date;
+}
+
 /**
  * Shares workflow cache entry state across components and keeps it in sync 
with the backend.
  */
@@ -33,6 +46,8 @@ import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.s
 export class WorkflowCacheEntriesService {
   private readonly cacheEntriesSubject = new 
BehaviorSubject<ReadonlyArray<WorkflowCacheEntry>>([]);
   private readonly loadingSubject = new BehaviorSubject<boolean>(false);
+  private readonly invalidationNoticeSubject = new 
BehaviorSubject<CacheInvalidationNotice | undefined>(undefined);
+  private readonly manualClearNoticeSubject = new 
BehaviorSubject<CacheManualClearNotice | undefined>(undefined);
   private currentWorkflowId?: number;
 
   constructor(
@@ -74,6 +89,20 @@ export class WorkflowCacheEntriesService {
     return this.loadingSubject.asObservable();
   }
 
+  /**
+   * Returns a stream of cache invalidation notices triggered by compilation.
+   */
+  public getInvalidationNoticeStream(): Observable<CacheInvalidationNotice | 
undefined> {
+    return this.invalidationNoticeSubject.asObservable();
+  }
+
+  /**
+   * Returns a stream of manual cache clear/eviction notices.
+   */
+  public getManualClearNoticeStream(): Observable<CacheManualClearNotice | 
undefined> {
+    return this.manualClearNoticeSubject.asObservable();
+  }
+
   /**
    * Returns the latest cache entry snapshot.
    */
@@ -108,6 +137,48 @@ export class WorkflowCacheEntriesService {
     );
   }
 
+  /**
+   * Emits a notification when auto-invalidated cache entries are removed.
+   */
+  public notifyInvalidation(workflowId: number, removedCount: number): void {
+    if (removedCount <= 0) {
+      return;
+    }
+    this.invalidationNoticeSubject.next({
+      workflowId,
+      removedCount,
+      timestamp: new Date(),
+    });
+  }
+
+  /**
+   * Clears the stored invalidation notice so it does not reappear.
+   */
+  public clearInvalidationNotice(): void {
+    this.invalidationNoticeSubject.next(undefined);
+  }
+
+  /**
+   * Emits a notification for manual cache clear or eviction actions.
+   */
+  public notifyManualClear(notice: CacheManualClearNotice): void {
+    this.manualClearNoticeSubject.next(notice);
+  }
+
+  /**
+   * Clears the stored manual cache-clear notice so it does not reappear.
+   */
+  public clearManualClearNotice(): void {
+    this.manualClearNoticeSubject.next(undefined);
+  }
+
+  /**
+   * Builds a stable key for a cache entry so invalidation diffs can be 
computed.
+   */
+  public buildEntryKey(entry: WorkflowCacheEntry): string {
+    return `${entry.globalPortId}:${entry.subdagHash}`;
+  }
+
   /**
    * Returns cached output port IDs for a logical operator.
    *

Reply via email to