This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 37099c2a5e80220ba207a072943bc54d1d156b1f
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 19 18:59:17 2026 -0800

    feat(cache): add auto invalidation toggle and auto-update of cache entries.
---
 .../websocket/event/CacheEntryUpdateEvent.scala    | 35 ++++++++++++++++
 .../websocket/event/TexeraWebSocketEvent.scala     |  1 +
 .../texera/web/service/ExecutionCacheService.scala | 34 ++++++++++++++--
 .../web/service/WorkflowExecutionService.scala     | 13 +++++-
 .../storage/ExecutionCacheEntryUpdateStore.scala   | 30 ++++++++++++++
 .../texera/web/storage/ExecutionStateStore.scala   |  4 +-
 docs/operator-port-cache.md                        | 15 ++++---
 .../cache-panel/cache-panel.component.html         |  7 ++++
 .../cache-panel/cache-panel.component.ts           | 46 ++++++++++++++++++++--
 .../workflow-editor/workflow-editor.component.ts   | 43 +++++++++++++++++---
 .../compile-workflow/workflow-compiling.service.ts |  4 ++
 .../workspace/service/joint-ui/joint-ui.service.ts | 33 ++++++++++------
 .../service/workflow-status/cache-usage.service.ts |  6 ++-
 .../workflow-cache-entries.service.ts              | 43 +++++++++++++++++++-
 .../types/workflow-websocket.interface.ts          |  8 ++++
 15 files changed, 286 insertions(+), 36 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala
new file mode 100644
index 0000000000..44ac31df6a
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheEntryUpdateEvent.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.model.websocket.event
+
+/**
+  * Websocket event emitted when a cached output entry is upserted.
+  *
+  * @param globalPortId Serialized GlobalPortIdentity string
+  * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint
+  * @param tupleCount Cached tuple count (optional)
+  * @param sourceExecutionId Execution id that produced the cached output
+  */
+case class CacheEntryUpdateEvent(
+    globalPortId: String,
+    subdagHash: String,
+    tupleCount: Option[Long],
+    sourceExecutionId: Long
+) extends TexeraWebSocketEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
index a4953c3e1e..6a8bcf6c80 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
@@ -36,6 +36,7 @@ import 
org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify
     new Type(value = classOf[ConsoleUpdateEvent]),
     new Type(value = classOf[CacheStatusUpdateEvent]),
     new Type(value = classOf[CacheUsageUpdateEvent]),
+    new Type(value = classOf[CacheEntryUpdateEvent]),
     new Type(value = classOf[PaginatedResultEvent]),
     new Type(value = classOf[PythonExpressionEvaluateResponse]),
     new Type(value = classOf[WorkerAssignmentUpdateEvent]),
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
index ee5ceeafe9..2f7d3d912a 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
@@ -20,25 +20,32 @@
 package org.apache.texera.web.service
 
 import com.typesafe.scalalogging.LazyLogging
-import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext}
+import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalPlan, WorkflowContext}
 import org.apache.texera.amber.engine.architecture.controller.PortMaterialized
 import org.apache.texera.amber.engine.common.client.AmberClient
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.web.SubscriptionManager
+import org.apache.texera.web.model.websocket.event.CacheEntryUpdateEvent
+import org.apache.texera.web.storage.{ExecutionCacheEntryUpdateStore, 
ExecutionStateStore}
 
 /**
   * Service that listens for port materialization events from the controller
-  * and persists cache metadata.
+  * and persists cache metadata. It also emits cache entry updates so the
+  * frontend can refresh cache info as new cached results appear.
   *
   * @param client AmberClient to register callbacks
   * @param cacheService OperatorPortCacheService for cache persistence
   * @param workflowContext WorkflowContext for workflow/execution IDs
   * @param physicalPlan PhysicalPlan for fingerprint computation
+  * @param executionStateStore Execution state store for cache entry updates
   */
 class ExecutionCacheService(
     client: AmberClient,
     cacheService: OperatorPortCacheService,
     workflowContext: WorkflowContext,
-    physicalPlan: PhysicalPlan
+    physicalPlan: PhysicalPlan,
+    executionStateStore: ExecutionStateStore
 ) extends SubscriptionManager
     with LazyLogging {
 
@@ -60,6 +67,7 @@ class ExecutionCacheService(
               evt.resultUri,
               evt.tupleCount
             )
+            emitCacheEntryUpdate(evt.portId, evt.tupleCount)
           } catch {
             case e: Throwable =>
               logger.error(s"Failed to upsert cache for port ${evt.portId}", e)
@@ -67,4 +75,24 @@ class ExecutionCacheService(
         })
     )
   }
+
+  /**
+    * Emits a cache entry update after a cache upsert so websocket clients
+    * can refresh cache metadata for the current execution.
+    */
+  private def emitCacheEntryUpdate(
+      portId: GlobalPortIdentity,
+      tupleCount: Option[Long]
+  ): Unit = {
+    val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
portId)
+    val updateEvent = CacheEntryUpdateEvent(
+      globalPortId = portId.serializeAsString,
+      subdagHash = fingerprint.subdagHash,
+      tupleCount = tupleCount,
+      sourceExecutionId = workflowContext.executionId.id
+    )
+    executionStateStore.cacheEntryUpdateStore.updateState(_ =>
+      ExecutionCacheEntryUpdateStore(Some(updateEvent), 
System.currentTimeMillis())
+    )
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index 4e0e841645..9a9a8a11b9 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -95,6 +95,11 @@ class WorkflowExecutionService(
       Iterable(CacheUsageUpdateEvent(newState.cachedOutputs))
     })
   )
+  addSubscription(
+    executionStateStore.cacheEntryUpdateStore.registerDiffHandler((_, 
newState) => {
+      newState.lastUpdate.toList
+    })
+  )
 
   private def createStateEvent(state: ExecutionMetadataStore): 
WorkflowStateEvent = {
     if (state.isRecovering && state.state != COMPLETED) {
@@ -185,7 +190,13 @@ class WorkflowExecutionService(
       new ExecutionReconfigurationService(client, executionStateStore, 
workflow)
     executionStatsService = new ExecutionStatsService(client, 
executionStateStore, workflow.context)
     executionCacheService =
-      new ExecutionCacheService(client, cacheService, workflow.context, 
workflow.physicalPlan)
+      new ExecutionCacheService(
+        client,
+        cacheService,
+        workflow.context,
+        workflow.physicalPlan,
+        executionStateStore
+      )
     executionRuntimeService = new ExecutionRuntimeService(
       client,
       executionStateStore,
diff --git 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala
 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala
new file mode 100644
index 0000000000..14bc324f75
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheEntryUpdateStore.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.storage
+
+import org.apache.texera.web.model.websocket.event.CacheEntryUpdateEvent
+
+/**
+  * Tracks the latest cache upsert so websocket clients can refresh cache 
metadata.
+  */
+case class ExecutionCacheEntryUpdateStore(
+    lastUpdate: Option[CacheEntryUpdateEvent] = None,
+    updatedAt: Long = System.currentTimeMillis()
+)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
index d307c8c021..6a6539db0e 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
@@ -56,6 +56,7 @@ class ExecutionStateStore {
   val breakpointStore = new StateStore(ExecutionBreakpointStore())
   val reconfigurationStore = new StateStore(ExecutionReconfigurationStore())
   val cacheUsageStore = new StateStore(ExecutionCacheUsageStore())
+  val cacheEntryUpdateStore = new StateStore(ExecutionCacheEntryUpdateStore())
 
   /**
     * Returns all state stores that should publish websocket updates for an 
execution.
@@ -67,7 +68,8 @@ class ExecutionStateStore {
       breakpointStore,
       metadataStore,
       reconfigurationStore,
-      cacheUsageStore
+      cacheUsageStore,
+      cacheEntryUpdateStore
     )
   }
 }
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 36c750910a..a98e9801c7 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -188,7 +188,7 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 **Cache metadata UI (Implemented)**:
 
 - Left panel "Cache" tab listing workflow cache entries (physical op id, port 
id, tuple count, source execution id, updated_at, short subdag hash)
-- Highlight cache entries usable by the current execution (fingerprint match)
+- Highlight cache entries usable by the current execution (fingerprint + 
source execution id match)
 - Cache panel toggle to show only entries usable by the current execution
 - Cache panel action to clear all cached results (deletes cache entries, 
result documents, and port result records)
 - Output port labels show tuple counts, plus a second line with source 
execution id for cached outputs
@@ -202,15 +202,18 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 - Context menu actions: "Clear cache" (selected operator) and "Clear cache up 
to this operator" (includes disabled operators and the selected operator)
 - Cache invalidation on compile: evict cache entries whose fingerprints no 
longer match the current workflow
 - Cache panel shows a notice when auto-invalidation removes entries after a 
compile
+- Cache panel toggle to enable or disable auto invalidation after compile
 - Cache panel shows a notice when users manually clear or evict cache entries 
(panel or context menu)
+- Cache panel auto-refreshes when cache entries are upserted or an execution 
completes
 - Compile endpoint accepts HashJoin join types (e.g., "full outer") to avoid 
400s during invalidation
 - TODO: Use source execution runtime stats for cached operator input/output 
counts, with fallback to `operator_port_cache.tuple_count` when stats are 
missing
 
 **Cache usage updates**:
 
-- `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
-- Frontend uses the event to drive cache entry highlighting and per-port cache 
labels
+- `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint + source execution id match)
+- Frontend uses the event to drive cache entry highlighting and per-port cache 
labels; matching includes source execution id to avoid treating new upserts as 
usable
 - Cache usage snapshots are re-emitted on websocket connect to keep labels 
visible after refresh
+- `CacheEntryUpdateEvent` is emitted when cached outputs are upserted during 
execution
 
 ### 5. Service & DAO Architecture
 
@@ -383,9 +386,9 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached 
regions in `workflow-editor.component.ts`
   - Region visibility: shared state via `WorkflowActionService.showRegion` 
ensures correct visibility when regions are created during execution
 - **Cache metadata UI** (Phase 1.3 - Complete):
-  - `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
+  - `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint + source execution id match)
   - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple 
count, source execution id, updated_at, short subdag hash)
-  - Cache entries highlight when usable by the current execution (fingerprint 
match)
+  - Cache entries highlight when usable by the current execution (fingerprint 
+ source execution id match)
   - Cache panel "Clear cache" action removes cached results and associated 
result artifacts
 - Cache panel can filter to only show entries usable by the current execution
   - Cached output ports show source execution id on a second label line
@@ -528,7 +531,7 @@ The cache system integrates with three layers:
 #### 1.3 Cache Metadata UI ✓ COMPLETE
 
 - [X] Add left panel "Cache" tab listing workflow cache entries (physical op 
id, port id, tuple count, source execution id, updated_at, short subdag hash)
-- [x] Highlight cache entries usable by the current execution (fingerprint 
match)
+- [x] Highlight cache entries usable by the current execution (fingerprint + 
source execution id match)
 - [X] Show per-output-port sourceExecutionId on a second output port label line
 - [X] Re-emit cache usage snapshots on websocket connect to refresh cache 
labels after reload
 - [X] Keep result URI hidden in the UI
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
index 78db959644..7d0bedb6c0 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
@@ -38,6 +38,13 @@
       nzPopconfirmTitle="Remove all cached results for this workflow?">
       Clear cache
     </button>
+    <button
+      nz-button
+      nzSize="small"
+      nzType="default"
+      (click)="toggleAutoInvalidation()">
+      Auto invalidation: {{ autoInvalidationEnabled ? 'On' : 'Off' }}
+    </button>
     <label class="cache-panel__toggle">
       <nz-switch
         nzSize="small"
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index e8023b59c1..f44d48f6b9 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -22,7 +22,9 @@ import { ActivatedRoute } from "@angular/router";
 import { finalize, tap } from "rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowCacheEntry } from 
"../../../../dashboard/type/workflow-cache-entry";
+import { ExecuteWorkflowService } from 
"../../../service/execute-workflow/execute-workflow.service";
 import { CacheUsageService } from 
"../../../service/workflow-status/cache-usage.service";
+import { ExecutionState } from "../../../types/execute-workflow.interface";
 import {
   CacheInvalidationNotice,
   CacheManualClearNotice,
@@ -44,6 +46,8 @@ export class CachePanelComponent implements OnInit {
   public visibleEntries: WorkflowCacheEntry[] = [];
   /** When true, only cache entries usable by the current execution are shown. 
*/
   public showUsableOnly = false;
+  /** When true, auto invalidation runs after successful compilation. */
+  public autoInvalidationEnabled = true;
   /** True while the cache eviction request is in flight. */
   public removing = false;
   public loading = false;
@@ -57,6 +61,7 @@ export class CachePanelComponent implements OnInit {
   constructor(
     private cacheEntriesService: WorkflowCacheEntriesService,
     private cacheUsageService: CacheUsageService,
+    private executeWorkflowService: ExecuteWorkflowService,
     private route: ActivatedRoute
   ) {}
 
@@ -85,7 +90,13 @@ export class CachePanelComponent implements OnInit {
       .pipe(untilDestroyed(this))
       .subscribe(entries => {
         this.usageKeys = new Set(
-          entries.map(entry => 
this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash))
+          entries.map(entry =>
+            this.cacheUsageService.buildUsageKey(
+              entry.globalPortId,
+              entry.subdagHash,
+              entry.sourceExecutionId
+            )
+          )
         );
         this.updateVisibleEntries();
       });
@@ -109,6 +120,20 @@ export class CachePanelComponent implements OnInit {
           this.manualClearNotice = undefined;
         }
       });
+    this.cacheEntriesService
+      .getAutoInvalidationEnabledStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(enabled => {
+        this.autoInvalidationEnabled = enabled;
+      });
+    this.executeWorkflowService
+      .getExecutionStateStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(({ current }) => {
+        if (current.state === ExecutionState.Completed) {
+          this.refresh();
+        }
+      });
   }
 
   /**
@@ -153,11 +178,17 @@ export class CachePanelComponent implements OnInit {
   }
 
   /**
-   * Returns true when a cache entry is usable by the current execution 
(fingerprint match),
-   * regardless of whether the scheduler chooses to reuse it.
+   * Returns true when a cache entry is usable by the current execution
+   * (fingerprint and source execution match), regardless of whether the 
scheduler chooses to reuse it.
    */
   public isUsableForExecution(entry: WorkflowCacheEntry): boolean {
-    return 
this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, 
entry.subdagHash));
+    return this.usageKeys.has(
+      this.cacheUsageService.buildUsageKey(
+        entry.globalPortId,
+        entry.subdagHash,
+        entry.sourceExecutionId
+      )
+    );
   }
 
   /**
@@ -169,6 +200,13 @@ export class CachePanelComponent implements OnInit {
       : this.cacheEntries;
   }
 
+  /**
+   * Toggles auto invalidation of cached results after compilation.
+   */
+  public toggleAutoInvalidation(): void {
+    this.cacheEntriesService.toggleAutoInvalidation();
+  }
+
   /**
    * Formats tuple counts for display.
    */
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index cc0126df69..1d8319b596 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -139,7 +139,30 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     magnet: SVGElement,
     event: joint.dia.Event
   ): boolean {
-    return magnet && magnet.getAttribute("port-group") === "out";
+    const portGroup = WorkflowEditorComponent.getMagnetAttribute(magnet, 
"port-group");
+    return portGroup === "out";
+  }
+
+  /**
+   * Resolves a port attribute from a magnet or its nearest port element.
+   * This keeps port interactions working even when ports have extra markup.
+   */
+  private static getMagnetAttribute(
+    magnet: SVGElement | null | undefined,
+    attribute: "port" | "port-group"
+  ): string | null {
+    if (!magnet) {
+      return null;
+    }
+    const direct = magnet.getAttribute(attribute);
+    if (direct) {
+      return direct;
+    }
+    const closest = magnet.closest(`[${attribute}]`);
+    if (closest instanceof SVGElement) {
+      return closest.getAttribute(attribute);
+    }
+    return null;
   }
 
   ngAfterViewInit() {
@@ -924,9 +947,14 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
         // set the multi-select mode
         this.wrapper.setMultiSelectMode(<boolean>event[1].shiftKey);
 
+        const portID = WorkflowEditorComponent.getMagnetAttribute(event[2] as 
SVGElement, "port");
+        if (!portID) {
+          return;
+        }
+
         const clickedPortID: LogicalPort = {
           operatorID: event[0].model.id as string,
-          portID: event[2].getAttribute("port") as string,
+          portID: portID,
         };
 
         if (event[1].shiftKey) {
@@ -1038,20 +1066,23 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     end: joint.dia.LinkEnd,
     linkView: joint.dia.LinkView
   ): boolean {
+    const sourcePortGroup = 
WorkflowEditorComponent.getMagnetAttribute(sourceMagnet, "port-group");
+    const targetPortGroup = 
WorkflowEditorComponent.getMagnetAttribute(targetMagnet, "port-group");
+
     // user cannot draw connection starting from the input port (left side)
-    if (sourceMagnet && sourceMagnet.getAttribute("port-group") === "in") {
+    if (sourcePortGroup === "in") {
       return false;
     }
 
     // user cannot connect to the output port (right side)
-    if (targetMagnet && targetMagnet.getAttribute("port-group") === "out") {
+    if (targetPortGroup === "out") {
       return false;
     }
 
     const sourceCellID = sourceView.model.id.toString();
-    const sourcePortID = sourceMagnet?.getAttribute("port");
+    const sourcePortID = 
WorkflowEditorComponent.getMagnetAttribute(sourceMagnet, "port");
     const targetCellID = targetView.model.id.toString();
-    const targetPortID = targetMagnet?.getAttribute("port");
+    const targetPortID = 
WorkflowEditorComponent.getMagnetAttribute(targetMagnet, "port");
 
     return this.validateOperatorConnection(sourceCellID, sourcePortID, 
targetCellID, targetPortID);
   }
diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index 02a5e30d93..d2beccb2ee 100644
--- 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -323,6 +323,7 @@ export class WorkflowCompilingService {
    * Triggers cache invalidation after a successful compilation.
    * Cache entries with mismatched fingerprints are removed on the backend, and
    * the cache panel is notified when entries are actually removed.
+   * This is skipped when auto invalidation is disabled in the cache panel.
    */
   private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void {
     const workflowId = this.workflowActionService.getWorkflowMetadata().wid;
@@ -333,6 +334,9 @@ export class WorkflowCompilingService {
     ) {
       return;
     }
+    if (!this.cacheEntriesService.isAutoInvalidationEnabled()) {
+      return;
+    }
     const beforeKeys = new Set(
       this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => 
this.cacheEntriesService.buildEntryKey(entry))
     );
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 7f098a3e10..d93110ebf2 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -427,7 +427,7 @@ export class JointUIService {
   }
 
   /**
-   * Updates cached output port indicator rings without changing counts or 
labels.
+   * Updates cached output port indicator badges without changing counts or 
labels.
    */
   public changeOperatorCachedPorts(
     jointPaper: joint.dia.Paper,
@@ -692,7 +692,8 @@ export class JointUIService {
   /**
    * This function changes the default svg of the operator ports.
    * It hides the port label that will display 'out/in' beside the operators.
-   * Port labels remain visible for per-port metrics and cache metadata.
+   * Port labels remain visible for per-port metrics and cache metadata, and 
cached
+   * output ports show a small badge that never captures pointer events so 
ports stay interactive.
    *
    * @returns the custom attributes of the ports
    */
@@ -702,13 +703,15 @@ export class JointUIService {
         fill: "#A0A0A0",
         r: 5,
         stroke: "none",
+        "pointer-events": "all",
       },
       ".port-cache-indicator": {
-        fill: "none",
-        r: 7,
-        stroke: "#1890ff",
-        "stroke-width": 1.5,
+        fill: "#fadb14",
+        points: "0,-12 4,-9 0,-6 -4,-9",
+        stroke: "#ad8b00",
+        "stroke-width": 1,
         display: "none",
+        "pointer-events": "none",
       },
       ".port-label": {
         visibility: "visible",
@@ -718,27 +721,31 @@ export class JointUIService {
         ref: ".port-body",
         "ref-y": 0.5,
         "y-alignment": "middle",
+        "pointer-events": "none",
       },
     };
   }
 
   /**
-   * Defines the default markup for ports.
+   * Defines the default markup for ports, including the cache badge and port 
body.
    */
   public static getCustomPortMarkup(): any[] {
     return [
       {
         tagName: "circle",
-        selector: ".port-cache-indicator",
+        selector: ".port-body",
         attributes: {
-          class: "port-cache-indicator",
+          class: "port-body",
+          magnet: true,
         },
       },
       {
-        tagName: "circle",
-        selector: ".port-body",
+        tagName: "polygon",
+        selector: ".port-cache-indicator",
         attributes: {
-          class: "port-body",
+          class: "port-cache-indicator",
+          magnet: false,
+          "pointer-events": "none",
         },
       },
     ];
@@ -754,6 +761,7 @@ export class JointUIService {
         selector: ".port-label",
         attributes: {
           class: "port-label",
+          "pointer-events": "none",
         },
       },
       {
@@ -761,6 +769,7 @@ export class JointUIService {
         selector: ".port-cache-label",
         attributes: {
           class: "port-cache-label",
+          "pointer-events": "none",
         },
       },
     ];
diff --git 
a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts 
b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts
index 59f10e8d54..99c89ce848 100644
--- a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts
+++ b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts
@@ -68,9 +68,11 @@ export class CacheUsageService {
 
   /**
    * Builds a stable key to match cache entries against cache usage updates.
+   * Includes the source execution to avoid tagging replaced cache entries as 
usable.
    */
-  public buildUsageKey(globalPortId: string, subdagHash: string): string {
-    return `${globalPortId}|${subdagHash}`;
+  public buildUsageKey(globalPortId: string, subdagHash: string, 
sourceExecutionId?: number): string {
+    const executionToken = sourceExecutionId == null ? "unknown" : 
sourceExecutionId.toString();
+    return `${globalPortId}|${subdagHash}|${executionToken}`;
   }
 
   private registerCacheUsageListener(): void {
diff --git 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
index 96731e3f92..852f9fddfb 100644
--- 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
@@ -19,9 +19,10 @@
 
 import { Injectable } from "@angular/core";
 import { BehaviorSubject, Observable } from "rxjs";
-import { finalize, tap } from "rxjs/operators";
+import { auditTime, finalize, tap } from "rxjs/operators";
 import { WorkflowExecutionsService } from 
"../../../dashboard/service/user/workflow-executions/workflow-executions.service";
 import { WorkflowCacheEntry } from 
"../../../dashboard/type/workflow-cache-entry";
+import { WorkflowWebsocketService } from 
"../workflow-websocket/workflow-websocket.service";
 import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
 
 export interface CacheInvalidationNotice {
@@ -48,10 +49,13 @@ export class WorkflowCacheEntriesService {
   private readonly loadingSubject = new BehaviorSubject<boolean>(false);
   private readonly invalidationNoticeSubject = new 
BehaviorSubject<CacheInvalidationNotice | undefined>(undefined);
   private readonly manualClearNoticeSubject = new 
BehaviorSubject<CacheManualClearNotice | undefined>(undefined);
+  /** Whether compile-time cache invalidation should run automatically. */
+  private readonly autoInvalidationEnabledSubject = new 
BehaviorSubject<boolean>(true);
   private currentWorkflowId?: number;
 
   constructor(
     private workflowExecutionsService: WorkflowExecutionsService,
+    private workflowWebsocketService: WorkflowWebsocketService,
     private workflowActionService: WorkflowActionService
   ) {
     this.workflowActionService.workflowMetaDataChanged().subscribe(metadata => 
{
@@ -73,6 +77,15 @@ export class WorkflowCacheEntriesService {
       this.currentWorkflowId = initialWorkflowId;
       this.refreshCacheEntries(initialWorkflowId).subscribe();
     }
+
+    this.workflowWebsocketService
+      .subscribeToEvent("CacheEntryUpdateEvent")
+      .pipe(auditTime(250))
+      .subscribe(() => {
+        if (this.currentWorkflowId && this.currentWorkflowId > 0) {
+          this.refreshCacheEntries(this.currentWorkflowId).subscribe();
+        }
+      });
   }
 
   /**
@@ -103,6 +116,13 @@ export class WorkflowCacheEntriesService {
     return this.manualClearNoticeSubject.asObservable();
   }
 
+  /**
+   * Returns a stream of the auto-invalidation toggle state.
+   */
+  public getAutoInvalidationEnabledStream(): Observable<boolean> {
+    return this.autoInvalidationEnabledSubject.asObservable();
+  }
+
   /**
    * Returns the latest cache entry snapshot.
    */
@@ -110,6 +130,27 @@ export class WorkflowCacheEntriesService {
     return this.cacheEntriesSubject.value;
   }
 
+  /**
+   * Returns true when auto invalidation on compile is enabled.
+   */
+  public isAutoInvalidationEnabled(): boolean {
+    return this.autoInvalidationEnabledSubject.value;
+  }
+
+  /**
+   * Updates the auto-invalidation toggle state.
+   */
+  public setAutoInvalidationEnabled(enabled: boolean): void {
+    this.autoInvalidationEnabledSubject.next(enabled);
+  }
+
+  /**
+   * Flips the auto-invalidation toggle state.
+   */
+  public toggleAutoInvalidation(): void {
+    
this.setAutoInvalidationEnabled(!this.autoInvalidationEnabledSubject.value);
+  }
+
   /**
    * Refreshes cache entries for a workflow and updates shared state.
    *
diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts 
b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
index e22633f213..a6c953e640 100644
--- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
+++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
@@ -157,6 +157,13 @@ export interface CacheUsageUpdateEvent
     cachedOutputs: ReadonlyArray<CachedPortUsage>;
   }> {}
 
+export type CacheEntryUpdateEvent = Readonly<{
+  globalPortId: string;
+  subdagHash: string;
+  tupleCount?: number;
+  sourceExecutionId: number;
+}>;
+
 export type PythonExpressionEvaluateRequest = Readonly<{
   expression: string;
   operatorId: string;
@@ -252,6 +259,7 @@ export type TexeraWebsocketEventTypeMap = {
   WorkflowAvailableResultEvent: WorkflowAvailableResultEvent;
   CacheStatusUpdateEvent: CacheStatusUpdateEvent;
   CacheUsageUpdateEvent: CacheUsageUpdateEvent;
+  CacheEntryUpdateEvent: CacheEntryUpdateEvent;
   PythonExpressionEvaluateResponse: PythonExpressionEvaluateResponse;
   WorkerAssignmentUpdateEvent: WorkerAssignmentUpdateEvent;
   ModifyLogicResponse: ModifyLogicResponse;

Reply via email to