This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 6201c9e742a3b5e4da037ceb7e1e4fcd75387391
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Jan 16 17:38:29 2026 -0800

    feat(cache): add cache invalidation, clear cache on operator, and 
highlighting of cached results.
---
 .../texera/web/dao/OperatorPortCacheDao.scala      |  48 ++++++++
 .../user/workflow/WorkflowExecutionsResource.scala |  60 +++++++++-
 .../web/service/OperatorPortCacheService.scala     |  95 +++++++++++++++-
 docs/operator-port-cache.md                        |  20 +++-
 .../workflow-executions.service.ts                 |  18 ++-
 .../cache-panel/cache-panel.component.ts           |  42 ++++---
 .../context-menu/context-menu.component.html       |  26 +++++
 .../context-menu/context-menu.component.ts         |  74 ++++++++++++-
 .../workflow-editor/workflow-editor.component.ts   |  51 +++++++++
 .../compile-workflow/workflow-compiling.service.ts |  43 ++++++-
 .../workspace/service/joint-ui/joint-ui.service.ts |  38 +++++++
 .../workflow-cache-entries.service.ts              | 123 +++++++++++++++++++++
 12 files changed, 603 insertions(+), 35 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala 
b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
index c1af475b3a..a2e2591802 100644
--- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
+++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
@@ -22,6 +22,7 @@ package org.apache.texera.web.dao
 import org.apache.texera.dao.SqlServer
 import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE
 import org.jooq.DSLContext
+import org.jooq.impl.DSL
 
 import java.net.URI
 import java.time.OffsetDateTime
@@ -197,4 +198,51 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
       .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
       .execute()
   }
+
+  /**
+    * Delete cache entries for a workflow by global port IDs.
+    * This removes all hashes for the specified output ports.
+    *
+    * @param workflowId Workflow ID whose cache entries should be deleted
+    * @param globalPortIds Serialized GlobalPortIdentity values
+    * @return Number of rows deleted
+    */
+  def deleteByGlobalPortIds(workflowId: Long, globalPortIds: Seq[String]): Int 
= {
+    val distinctPorts = globalPortIds.distinct
+    if (distinctPorts.isEmpty) {
+      return 0
+    }
+    context
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.in(distinctPorts.asJava))
+      .execute()
+  }
+
+  /**
+    * Delete cache entries for a workflow by (global_port_id, subdag_hash) 
pairs.
+    * This removes only the specified fingerprint versions.
+    *
+    * @param workflowId Workflow ID whose cache entries should be deleted
+    * @param portHashes Sequence of (globalPortId, subdagHash) to remove
+    * @return Number of rows deleted
+    */
+  def deleteByGlobalPortAndHashes(
+      workflowId: Long,
+      portHashes: Seq[(String, String)]
+  ): Int = {
+    if (portHashes.isEmpty) {
+      return 0
+    }
+    val conditions = portHashes.map { case (portId, subdagHash) =>
+      OPERATOR_PORT_CACHE.GLOBAL_PORT_ID
+        .eq(portId)
+        .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash))
+    }
+    context
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .and(DSL.or(conditions.asJava))
+      .execute()
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index 70fbdd1883..b8bf13ec67 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.core.storage.{
 }
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity._
-import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PortIdentity, WorkflowContext}
 import 
org.apache.texera.amber.engine.architecture.logreplay.{ReplayDestination, 
ReplayLogRecord}
 import org.apache.texera.amber.engine.common.Utils.{maptoStatusCode, 
stringToAggregatedState}
 import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
@@ -44,6 +44,7 @@ import 
org.apache.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
 import org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, 
User => UserPojo}
 import org.apache.texera.web.dao.OperatorPortCacheDao
 import org.apache.texera.web.model.http.request.result.ResultExportRequest
+import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
 import org.apache.texera.web.service.{
   ExecutionsMetadataPersistService,
@@ -52,6 +53,7 @@ import org.apache.texera.web.service.{
 }
 import org.jooq.DSLContext
 import play.api.libs.json.Json
+import org.apache.texera.workflow.WorkflowCompiler
 
 import java.net.URI
 import java.sql.Timestamp
@@ -589,6 +591,13 @@ case class ExecutionGroupDeleteRequest(wid: Integer, eIds: 
Array[Integer])
 
 case class ExecutionRenameRequest(wid: Integer, eId: Integer, executionName: 
String)
 
+/**
+  * Request payload for evicting cache entries owned by specific logical 
operators.
+  *
+  * @param logicalOpIds Logical operator IDs whose output caches should be 
removed
+  */
+case class CacheEvictionRequest(logicalOpIds: List[String])
+
 @Produces(Array(MediaType.APPLICATION_JSON, 
MediaType.APPLICATION_OCTET_STREAM, "application/zip"))
 @Path("/executions")
 class WorkflowExecutionsResource {
@@ -848,6 +857,55 @@ class WorkflowExecutionsResource {
     clearWorkflowCacheEntriesInternal(wid, sessionUser)
   }
 
+  /**
+    * Evicts cache entries for the specified logical operators.
+    * This removes all cached outputs produced by those operators, regardless 
of fingerprint.
+    */
+  @POST
+  @Consumes(Array(MediaType.APPLICATION_JSON))
+  @Path("/{wid}/cache/evict")
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  def evictWorkflowCacheEntries(
+      request: CacheEvictionRequest,
+      @PathParam("wid") wid: Integer,
+      @Auth sessionUser: SessionUser
+  ): Unit = {
+    validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid)
+    val logicalOpIds = 
Option(request.logicalOpIds).getOrElse(List.empty).map(_.trim).filter(_.nonEmpty)
+    if (logicalOpIds.isEmpty) {
+      return
+    }
+    val dao = new OperatorPortCacheDao(SqlServer.getInstance())
+    val cacheService = new OperatorPortCacheService(dao)
+    
cacheService.invalidateCacheForLogicalOperators(WorkflowIdentity(wid.toLong), 
logicalOpIds)
+  }
+
+  /**
+    * Invalidates cache entries whose fingerprints do not match the provided 
logical plan.
+    * Intended to be called after workflow compilation during editing.
+    */
+  @POST
+  @Consumes(Array(MediaType.APPLICATION_JSON))
+  @Path("/{wid}/cache/invalidate")
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  def invalidateWorkflowCacheEntries(
+      request: LogicalPlanPojo,
+      @PathParam("wid") wid: Integer,
+      @Auth sessionUser: SessionUser
+  ): Unit = {
+    validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid)
+    val workflow = try {
+      val workflowContext = new WorkflowContext(workflowId = 
WorkflowIdentity(wid.toLong))
+      new WorkflowCompiler(workflowContext).compile(request)
+    } catch {
+      case err: Throwable =>
+        throw new BadRequestException(s"Failed to compile workflow for cache 
invalidation: ${err.getMessage}")
+    }
+    val dao = new OperatorPortCacheDao(SqlServer.getInstance())
+    val cacheService = new OperatorPortCacheService(dao)
+    
cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), 
workflow.physicalPlan)
+  }
+
   /**
     * Shared handler for cache eviction endpoints.
     */
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
index 82aef5f4c8..e85afef1c5 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
@@ -22,6 +22,7 @@ package org.apache.texera.web.service
 import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
 import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
 import org.apache.texera.amber.core.workflow.{CachedOutput, 
GlobalPortIdentity, PhysicalPlan}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.dao.SqlServer
 import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_EXECUTIONS
@@ -30,6 +31,7 @@ import org.apache.texera.amber.core.storage.DocumentFactory
 
 import java.net.URI
 import scala.jdk.CollectionConverters._
+import scala.util.Try
 
 /**
   * Service for operator port result caching.
@@ -40,6 +42,7 @@ import scala.jdk.CollectionConverters._
   * - Cache entry creation when output ports complete
   * - Fingerprint computation and serialization
   * - Cache invalidation and lifecycle management
+  * - Manual eviction by logical operator and compile-time mismatch cleanup
   *
   * @param dao OperatorPortCacheDao for database access
   */
@@ -131,10 +134,59 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) 
{
     */
   def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = {
     val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0)
-    val resultUris = cacheEntries.map(_.resultUri).distinct
-    deleteOperatorPortResultsByUris(resultUris)
-    dao.deleteByWorkflow(workflowId.id)
-    clearCachedResultDocuments(resultUris)
+    deleteCacheEntriesByPorts(workflowId, cacheEntries)
+  }
+
+  /**
+    * Invalidate cache entries for outputs owned by the provided logical 
operators.
+    * This is used by editor actions that evict cache for selected operators.
+    *
+    * @param workflowId Workflow ID whose cache entries should be deleted
+    * @param logicalOpIds Logical operator IDs whose output caches should be 
removed
+    * @return Number of cache entries invalidated
+    */
+  def invalidateCacheForLogicalOperators(
+      workflowId: WorkflowIdentity,
+      logicalOpIds: Seq[String]
+  ): Int = {
+    val normalizedIds = logicalOpIds.map(_.trim).filter(_.nonEmpty).toSet
+    if (normalizedIds.isEmpty) {
+      return 0
+    }
+    val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0)
+    val entriesToDelete = cacheEntries.filter { entry =>
+      val logicalOpIdOpt =
+        Try(GlobalPortIdentitySerde.deserializeFromString(entry.globalPortId))
+          .toOption
+          .map(_.opId.logicalOpId.id)
+      logicalOpIdOpt.exists(normalizedIds.contains)
+    }
+    deleteCacheEntriesByPorts(workflowId, entriesToDelete)
+    entriesToDelete.size
+  }
+
+  /**
+    * Invalidate cache entries whose fingerprints no longer match the current 
plan.
+    * This supports compile-time invalidation after workflow edits.
+    *
+    * @param workflowId Workflow ID whose cache entries should be checked
+    * @param physicalPlan Latest compiled physical plan
+    * @return Number of cache entries invalidated
+    */
+  def invalidateMismatchedCacheEntries(
+      workflowId: WorkflowIdentity,
+      physicalPlan: PhysicalPlan
+  ): Int = {
+    val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0)
+    val entriesToDelete = cacheEntries.filter { entry =>
+      val fingerprintHashOpt = Try {
+        val portId = 
GlobalPortIdentitySerde.deserializeFromString(entry.globalPortId)
+        FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
portId).subdagHash
+      }.toOption
+      fingerprintHashOpt.forall(_ != entry.subdagHash)
+    }
+    deleteCacheEntriesByKeys(workflowId, entriesToDelete)
+    entriesToDelete.size
   }
 
   /**
@@ -165,6 +217,41 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
     }
   }
 
+  /**
+    * Deletes cache entries by port ID, and removes associated result 
artifacts.
+    */
+  private def deleteCacheEntriesByPorts(
+      workflowId: WorkflowIdentity,
+      entries: Seq[OperatorPortCacheRecord]
+  ): Unit = {
+    if (entries.isEmpty) {
+      return
+    }
+    val resultUris = entries.map(_.resultUri).distinct
+    deleteOperatorPortResultsByUris(resultUris)
+    dao.deleteByGlobalPortIds(workflowId.id, 
entries.map(_.globalPortId).distinct)
+    clearCachedResultDocuments(resultUris)
+  }
+
+  /**
+    * Deletes cache entries by (port ID, subDAG hash) pair, and removes 
associated result artifacts.
+    */
+  private def deleteCacheEntriesByKeys(
+      workflowId: WorkflowIdentity,
+      entries: Seq[OperatorPortCacheRecord]
+  ): Unit = {
+    if (entries.isEmpty) {
+      return
+    }
+    val resultUris = entries.map(_.resultUri).distinct
+    deleteOperatorPortResultsByUris(resultUris)
+    dao.deleteByGlobalPortAndHashes(
+      workflowId.id,
+      entries.map(entry => (entry.globalPortId, entry.subdagHash))
+    )
+    clearCachedResultDocuments(resultUris)
+  }
+
   /**
     * Future: Cost-aware eviction when storage quota is exceeded.
     * Phase 3: Lifecycle management research.
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index bde5981d9e..775d06e6e8 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -192,8 +192,17 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 - Cache panel toggle to show only entries usable by the current execution
 - Cache panel action to clear all cached results (deletes cache entries, 
result documents, and port result records)
 - Output port labels show tuple counts, plus a second line with source 
execution id for cached outputs
+- Output ports show a cached indicator when any cache entry exists for that 
port
+- Editor context menu can evict cache for the selected operator or its 
upstream operators
 - Result URI hidden from the UI
 
+**Cache UX & invalidation (Implemented)**:
+
+- Output ports show a cached indicator when any cache entry exists for that 
port (no usable/not-usable distinction on the graph)
+- Context menu actions: "Clear cache" (selected operator) and "Clear cache up 
to this operator" (includes disabled operators and the selected operator)
+- Cache invalidation on compile: evict cache entries whose fingerprints no 
longer match the current workflow
+- TODO: Use source execution runtime stats for cached operator input/output 
counts, with fallback to `operator_port_cache.tuple_count` when stats are 
missing
+
 **Cache usage updates**:
 
 - `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
@@ -282,6 +291,8 @@ HTTP endpoints for external access:
 - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache 
entries (result_uri omitted)
 - `DELETE /executions/{workflowId}/cache`: Clear all cache entries and delete 
stored result documents
 - `POST /executions/{workflowId}/cache/clear`: POST alternative for 
environments that block DELETE
+- `POST /executions/{workflowId}/cache/evict`: Evict cache entries for 
specified logical operator IDs
+- `POST /executions/{workflowId}/cache/invalidate`: Remove cache entries whose 
fingerprints do not match the provided logical plan
 
 **Note**: Internal services use `OperatorPortCacheService`, not the REST 
resource.
 
@@ -519,7 +530,14 @@ The cache system integrates with three layers:
 - [X] Re-emit cache usage snapshots on websocket connect to refresh cache 
labels after reload
 - [X] Keep result URI hidden in the UI
 
-#### 1.4 Testing & Validation
+#### 1.4 Cache UX & Invalidation ✓ COMPLETE
+
+- [X] Show cached indicator on output ports for any cache entry
+- [X] Add context-menu actions to clear cache for selected operator and 
upstream operators (includes disabled)
+- [X] Invalidate mismatched cache entries on compile (fingerprint comparison)
+- [ ] TODO: Use source execution runtime stats for cached operator counts, 
with tuple-count fallback
+
+#### 1.5 Testing & Validation
 
 - [X] Verify downstream cached URI consumption across all operator types
 - [ ] Add integration tests: cache upsert → DB verification
diff --git 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
index 615ffefb73..a5fe110d96 100644
--- 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
+++ 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
@@ -24,7 +24,7 @@ import { HttpClient, HttpParams } from "@angular/common/http";
 import { WorkflowExecutionsEntry } from 
"../../../type/workflow-executions-entry";
 import { WorkflowCacheEntry } from "../../../type/workflow-cache-entry";
 import { WorkflowRuntimeStatistics } from 
"../../../type/workflow-runtime-statistics";
-import { ExecutionState } from 
"../../../../workspace/types/execute-workflow.interface";
+import { ExecutionState, LogicalPlan } from 
"../../../../workspace/types/execute-workflow.interface";
 
 export const WORKFLOW_EXECUTIONS_API_BASE_URL = 
`${AppSettings.getApiEndpoint()}/executions`;
 
@@ -114,4 +114,20 @@ export class WorkflowExecutionsService {
   deleteWorkflowCacheEntries(wid: number): Observable<void> {
     return 
this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/clear`, 
{});
   }
+
+  /**
+   * Evicts cache entries owned by the provided logical operators.
+   */
+  evictWorkflowCacheEntries(wid: number, logicalOpIds: ReadonlyArray<string>): 
Observable<void> {
+    return 
this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/evict`, {
+      logicalOpIds,
+    });
+  }
+
+  /**
+   * Invalidates cache entries whose fingerprints do not match the provided 
logical plan.
+   */
+  invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): 
Observable<void> {
+    return 
this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`,
 logicalPlan);
+  }
 }
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index 7a2e50a426..967d9e298e 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -21,9 +21,9 @@ import { Component, OnInit } from "@angular/core";
 import { ActivatedRoute } from "@angular/router";
 import { finalize } from "rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
-import { WorkflowExecutionsService } from 
"../../../../dashboard/service/user/workflow-executions/workflow-executions.service";
 import { WorkflowCacheEntry } from 
"../../../../dashboard/type/workflow-cache-entry";
 import { CacheUsageService } from 
"../../../service/workflow-status/cache-usage.service";
+import { WorkflowCacheEntriesService } from 
"../../../service/workflow-status/workflow-cache-entries.service";
 
 /**
  * CachePanelComponent renders cache entry metadata for the current workflow.
@@ -47,7 +47,7 @@ export class CachePanelComponent implements OnInit {
   private usageKeys = new Set<string>();
 
   constructor(
-    private workflowExecutionsService: WorkflowExecutionsService,
+    private cacheEntriesService: WorkflowCacheEntriesService,
     private cacheUsageService: CacheUsageService,
     private route: ActivatedRoute
   ) {}
@@ -58,6 +58,19 @@ export class CachePanelComponent implements OnInit {
       return;
     }
     this.workflowId = workflowId;
+    this.cacheEntriesService
+      .getCacheEntriesStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(entries => {
+        this.cacheEntries = [...entries];
+        this.updateVisibleEntries();
+      });
+    this.cacheEntriesService
+      .getLoadingStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(loading => {
+        this.loading = loading;
+      });
     this.refresh();
     this.cacheUsageService
       .getCacheUsageStream()
@@ -77,41 +90,26 @@ export class CachePanelComponent implements OnInit {
     if (!this.workflowId) {
       return;
     }
-    this.loading = true;
-    this.workflowExecutionsService
-      .retrieveWorkflowCacheEntries(this.workflowId)
-      .pipe(
-        finalize(() => {
-          this.loading = false;
-        }),
-        untilDestroyed(this)
-      )
-      .subscribe(entries => {
-        this.cacheEntries = entries;
-        this.updateVisibleEntries();
-      });
+    
this.cacheEntriesService.refreshCacheEntries(this.workflowId).pipe(untilDestroyed(this)).subscribe();
   }
 
   /**
-   * Removes all cached outputs for the workflow and refreshes the list.
+   * Removes all cached outputs for the workflow and updates shared cache 
state.
    */
   public clearCacheEntries(): void {
     if (!this.workflowId) {
       return;
     }
     this.removing = true;
-    this.workflowExecutionsService
-      .deleteWorkflowCacheEntries(this.workflowId)
+    this.cacheEntriesService
+      .clearWorkflowCacheEntries(this.workflowId)
       .pipe(
         finalize(() => {
           this.removing = false;
         }),
         untilDestroyed(this)
       )
-      .subscribe(() => {
-        this.cacheEntries = [];
-        this.updateVisibleEntries();
-      });
+      .subscribe();
   }
 
   /**
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
index 7a6fcba6f2..c55c352321 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
+++ 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html
@@ -123,6 +123,32 @@
       nzTheme="twotone"></span
     >remove reusing result
   </li>
+  <li
+    nz-menu-item
+    *ngIf="operatorMenuService.highlightedOperators.value.length === 1 &&
+  operatorMenuService.highlightedCommentBoxes.value.length === 0 &&
+  !hasHighlightedLinks() &&
+  isWorkflowModifiable"
+    (click)="clearCacheForSelectedOperator()">
+    <span
+      nz-icon
+      nzType="database"
+      nzTheme="outline"></span
+    >clear cache
+  </li>
+  <li
+    nz-menu-item
+    *ngIf="operatorMenuService.highlightedOperators.value.length === 1 &&
+  operatorMenuService.highlightedCommentBoxes.value.length === 0 &&
+  !hasHighlightedLinks() &&
+  isWorkflowModifiable"
+    (click)="clearCacheUpToSelectedOperator()">
+    <span
+      nz-icon
+      nzType="database"
+      nzTheme="twotone"></span
+    >clear cache up to this operator
+  </li>
   <li
     nz-menu-item
     *ngIf="(operatorMenuService.highlightedOperators.value.length > 0 ||
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
index 2807066b76..70e9236bc1 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts
@@ -27,6 +27,8 @@ import { NzModalService } from "ng-zorro-antd/modal";
 import { ResultExportationComponent } from 
"../../../result-exportation/result-exportation.component";
 import { ValidationWorkflowService } from 
"src/app/workspace/service/validation/validation-workflow.service";
 import { GuiConfigService } from 
"../../../../../common/service/gui-config.service";
+import { WorkflowExecutionsService } from 
"src/app/dashboard/service/user/workflow-executions/workflow-executions.service";
+import { WorkflowCacheEntriesService } from 
"src/app/workspace/service/workflow-status/workflow-cache-entries.service";
 
 @UntilDestroy()
 @Component({
@@ -44,7 +46,9 @@ export class ContextMenuComponent {
     protected config: GuiConfigService,
     private workflowResultService: WorkflowResultService,
     private modalService: NzModalService,
-    private validationWorkflowService: ValidationWorkflowService
+    private validationWorkflowService: ValidationWorkflowService,
+    private workflowExecutionsService: WorkflowExecutionsService,
+    private cacheEntriesService: WorkflowCacheEntriesService
   ) {
     this.registerWorkflowModifiableChangedHandler();
   }
@@ -144,4 +148,72 @@ export class ContextMenuComponent {
       nzFooter: null,
     });
   }
+
+  /**
+   * Clears cached outputs produced by the selected operator.
+   */
+  public clearCacheForSelectedOperator(): void {
+    const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid;
+    if (!workflowId || !this.hasExactlyOneOperatorSelected()) {
+      return;
+    }
+    const operatorId = this.getSelectedOperatorID();
+    this.workflowExecutionsService
+      .evictWorkflowCacheEntries(workflowId, [operatorId])
+      .pipe(untilDestroyed(this))
+      .subscribe(() => {
+        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+      });
+  }
+
+  /**
+   * Clears cached outputs produced by the selected operator and its upstream 
operators.
+   */
+  public clearCacheUpToSelectedOperator(): void {
+    const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid;
+    if (!workflowId || !this.hasExactlyOneOperatorSelected()) {
+      return;
+    }
+    const operatorId = this.getSelectedOperatorID();
+    const upstreamOperatorIds = this.collectUpstreamOperatorIds(operatorId);
+    this.workflowExecutionsService
+      .evictWorkflowCacheEntries(workflowId, upstreamOperatorIds)
+      .pipe(untilDestroyed(this))
+      .subscribe(() => {
+        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+      });
+  }
+
+  /**
+   * Returns the selected operator and all upstream operator IDs (includes 
disabled operators).
+   */
+  private collectUpstreamOperatorIds(operatorId: string): string[] {
+    const links = this.workflowActionService.getTexeraGraph().getAllLinks();
+    const incoming = new Map<string, string[]>();
+    links.forEach(link => {
+      const sourceId = link.source.operatorID;
+      const targetId = link.target.operatorID;
+      if (!incoming.has(targetId)) {
+        incoming.set(targetId, []);
+      }
+      incoming.get(targetId)!.push(sourceId);
+    });
+
+    const visited = new Set<string>();
+    const queue: string[] = [operatorId];
+    while (queue.length > 0) {
+      const current = queue.shift();
+      if (!current || visited.has(current)) {
+        continue;
+      }
+      visited.add(current);
+      const upstream = incoming.get(current) ?? [];
+      upstream.forEach(upstreamId => {
+        if (!visited.has(upstreamId)) {
+          queue.push(upstreamId);
+        }
+      });
+    }
+    return Array.from(visited);
+  }
 }
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index 45b6857327..cc0126df69 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -28,9 +28,11 @@ import { fromJointPaperEvent, JointUIService, 
linkPathStrokeColor } from "../../
 import { ValidationWorkflowService } from 
"../../service/validation/validation-workflow.service";
 import { WorkflowActionService } from 
"../../service/workflow-graph/model/workflow-action.service";
 import { CacheUsageService } from 
"../../service/workflow-status/cache-usage.service";
+import { WorkflowCacheEntriesService } from 
"../../service/workflow-status/workflow-cache-entries.service";
 import { WorkflowStatusService } from 
"../../service/workflow-status/workflow-status.service";
 import { ExecutionState, OperatorState } from 
"../../types/execute-workflow.interface";
 import { LogicalPort, OperatorLink } from 
"../../types/workflow-common.interface";
+import { WorkflowCacheEntry } from 
"../../../dashboard/type/workflow-cache-entry";
 import { auditTime, filter, map, takeUntil } from "rxjs/operators";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { UndoRedoService } from "../../service/undo-redo/undo-redo.service";
@@ -95,6 +97,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
   private currentOpenedOperatorID: string | null = null;
   private removeButton!: new () => joint.linkTools.Button;
   private breakpointButton!: new () => joint.linkTools.Button;
+  private cachedEntries: ReadonlyArray<WorkflowCacheEntry> = [];
 
   constructor(
     private workflowActionService: WorkflowActionService,
@@ -104,6 +107,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     private jointUIService: JointUIService,
     private workflowStatusService: WorkflowStatusService,
     private cacheUsageService: CacheUsageService,
+    private cacheEntriesService: WorkflowCacheEntriesService,
     private executeWorkflowService: ExecuteWorkflowService,
     private nzModalService: NzModalService,
     private changeDetectorRef: ChangeDetectorRef,
@@ -166,6 +170,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     this.registerPortDisplayNameChangeHandler();
     this.handleOperatorStatisticsUpdate();
     this.handleCacheUsageUpdate();
+    this.handleCacheEntriesUpdate();
     this.handleRegionEvents();
     this.handleOperatorSuggestionHighlightEvent();
     this.handleElementDelete();
@@ -358,6 +363,52 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       });
   }
 
+  /**
+   * Updates cached output port indicators whenever cache entries or graph 
structure changes.
+   */
+  private handleCacheEntriesUpdate(): void {
+    this.cacheEntriesService
+      .getCacheEntriesStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(entries => {
+        this.cachedEntries = entries;
+        this.applyCachedPortIndicators();
+      });
+
+    merge(
+      this.workflowActionService.getTexeraGraph().getOperatorAddStream(),
+      this.workflowActionService.getTexeraGraph().getOperatorDeleteStream(),
+      this.workflowActionService.getTexeraGraph().getPortAddedOrDeletedStream()
+    )
+      .pipe(untilDestroyed(this))
+      .subscribe(() => {
+        this.applyCachedPortIndicators();
+      });
+  }
+
+  /**
+   * Applies cached port indicators based on the latest cache entry snapshot.
+   */
+  private applyCachedPortIndicators(): void {
+    const cachedPortsByOperator = new Map<string, Set<string>>();
+    this.cachedEntries
+      .filter(entry => !entry.internal)
+      .forEach(entry => {
+        if (!cachedPortsByOperator.has(entry.logicalOpId)) {
+          cachedPortsByOperator.set(entry.logicalOpId, new Set<string>());
+        }
+        
cachedPortsByOperator.get(entry.logicalOpId)!.add(entry.portId.toString());
+      });
+
+    this.workflowActionService
+      .getTexeraGraph()
+      .getAllOperators()
+      .forEach(op => {
+        const cachedPorts = cachedPortsByOperator.get(op.operatorID) ?? new 
Set<string>();
+        this.jointUIService.changeOperatorCachedPorts(this.paper, 
op.operatorID, cachedPorts);
+      });
+  }
+
   private handleRegionEvents(): void {
     const Region = joint.dia.Element.define(
       "region",
diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index 9648d2b305..b2e3477926 100644
--- 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -25,7 +25,7 @@ import { AppSettings } from "../../../common/app-setting";
 import { areOperatorSchemasEqual, OperatorSchema } from 
"../../types/operator-schema.interface";
 import { ExecuteWorkflowService } from 
"../execute-workflow/execute-workflow.service";
 import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
-import { catchError, debounceTime, mergeMap } from "rxjs/operators";
+import { catchError, debounceTime, map, mergeMap } from "rxjs/operators";
 import { DynamicSchemaService } from 
"../dynamic-schema/dynamic-schema.service";
 import {
   AttributeType,
@@ -42,6 +42,8 @@ import { WorkflowGraphReadonly } from 
"../workflow-graph/model/workflow-graph";
 import { serializePortIdentity } from 
"../../../common/util/port-identity-serde";
 import { addCompilationError, areAllPortSchemasEqual } from 
"../../../common/util/workflow-compilation-utils";
 import { parseLogicalOperatorPortID } from 
"../../../common/util/logical-operator-port-serde";
+import { WorkflowExecutionsService } from 
"../../../dashboard/service/user/workflow-executions/workflow-executions.service";
+import { WorkflowCacheEntriesService } from 
"../workflow-status/workflow-cache-entries.service";
 
 // endpoint for workflow compile
 export const WORKFLOW_COMPILATION_ENDPOINT = "compile";
@@ -49,10 +51,11 @@ export const WORKFLOW_COMPILATION_ENDPOINT = "compile";
 export const WORKFLOW_COMPILATION_DEBOUNCE_TIME_MS = 500;
 
 /**
- * Workflow Compiling Service provides mainly 3 functionalities:
+ * Workflow Compiling Service provides mainly 4 functionalities:
  * 1. autocomplete attribute property of operators (previously done by the 
SchemaPropagationService)
  * 2. receive static errors (previously done by sending 
EditingTimeCompilationRequest and saving in the ExecutionStateInfo)
  * 3. manage PhysicalPlan (TODO: send the physical plan to the standalone 
WorkflowExecutingService once we have it)
+ * 4. invalidate mismatched cache entries after successful compilation
  *
  * When user creates and connects operators in workflow, the 
WorkflowCompilingService's api will be triggered, which,
  * propagate the schemas, compiles the user's workflow to get the physical 
plan and static errors(if any).
@@ -73,7 +76,9 @@ export class WorkflowCompilingService {
     private httpClient: HttpClient,
     private workflowActionService: WorkflowActionService,
     private dynamicSchemaService: DynamicSchemaService,
-    private validationWorkflowService: ValidationWorkflowService
+    private validationWorkflowService: ValidationWorkflowService,
+    private workflowExecutionsService: WorkflowExecutionsService,
+    private cacheEntriesService: WorkflowCacheEntriesService
   ) {
     // Subscribe to compilation state changes to apply schema propagation
     this.compilationStateInfoChangedStream.subscribe(() => {
@@ -98,10 +103,10 @@ export class WorkflowCompilingService {
             this.validationWorkflowService.getValidTexeraGraph(),
             undefined
           );
-          return this.compile(logicalPlan);
+          return this.compile(logicalPlan).pipe(map(response => ({ response, 
logicalPlan })));
         })
       )
-      .subscribe(response => {
+      .subscribe(({ response, logicalPlan }) => {
         if (response.physicalPlan) {
           this.currentCompilationStateInfo = {
             state: CompilationState.Succeeded,
@@ -116,6 +121,7 @@ export class WorkflowCompilingService {
           };
         }
         
this.compilationStateInfoChangedStream.next(this.currentCompilationStateInfo.state);
+        this.invalidateMismatchedCacheEntries(logicalPlan);
       });
   }
 
@@ -294,6 +300,7 @@ export class WorkflowCompilingService {
       opsToReuseResult: [],
       opsToViewResult: [],
     };
+    console.log(body);
     // make a http post request to the API endpoint with the logical plan 
object
     return this.httpClient
       .post<WorkflowCompilationResponse>(
@@ -313,6 +320,32 @@ export class WorkflowCompilingService {
       );
   }
 
+  /**
+   * Triggers cache invalidation after a successful compilation.
+   * Cache entries with mismatched fingerprints are removed on the backend.
+   */
+  private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void {
+    const workflowId = this.workflowActionService.getWorkflowMetadata().wid;
+    if (
+      workflowId === undefined ||
+      workflowId <= 0 ||
+      this.currentCompilationStateInfo.state !== CompilationState.Succeeded
+    ) {
+      return;
+    }
+    this.workflowExecutionsService
+      .invalidateWorkflowCacheEntries(workflowId, logicalPlan)
+      .pipe(
+        catchError(err => {
+          console.warn("cache invalidation failed during compilation", err);
+          return EMPTY;
+        })
+      )
+      .subscribe(() => {
+        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe();
+      });
+  }
+
   public static setOperatorInputAttrs(
     operatorSchema: OperatorSchema,
     inputPortSchemaMap: OperatorPortSchemaMap | undefined
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index c174e63958..7f098a3e10 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -425,6 +425,30 @@ export class JointUIService {
       }
     });
   }
+
+  /**
+   * Updates cached output port indicator rings without changing counts or 
labels.
+   */
+  public changeOperatorCachedPorts(
+    jointPaper: joint.dia.Paper,
+    operatorID: string,
+    cachedPortIds?: Set<string>
+  ): void {
+    const element = jointPaper.getModelById(operatorID) as 
joint.shapes.devs.Model;
+    if (!element) {
+      return;
+    }
+    const outPorts = element.getPorts().filter(p => p.group === "out");
+    outPorts.forEach(portDef => {
+      const portId = portDef.id;
+      if (portId != null) {
+        const parts = portId.split("-");
+        const numericSuffix = parts.length > 1 ? parts[1] : portId;
+        const isCached = cachedPortIds?.has(numericSuffix) ?? false;
+        element.portProp(portId, "attrs/.port-cache-indicator/display", 
isCached ? "block" : "none");
+      }
+    });
+  }
   public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): 
void {
     jointPaper.getModelById(operatorID).attr({
       [`.${operatorStateClass}`]: { visibility: "hidden" },
@@ -679,6 +703,13 @@ export class JointUIService {
         r: 5,
         stroke: "none",
       },
+      ".port-cache-indicator": {
+        fill: "none",
+        r: 7,
+        stroke: "#1890ff",
+        "stroke-width": 1.5,
+        display: "none",
+      },
       ".port-label": {
         visibility: "visible",
         event: "input-label:evt",
@@ -696,6 +727,13 @@ export class JointUIService {
    */
   public static getCustomPortMarkup(): any[] {
     return [
+      {
+        tagName: "circle",
+        selector: ".port-cache-indicator",
+        attributes: {
+          class: "port-cache-indicator",
+        },
+      },
       {
         tagName: "circle",
         selector: ".port-body",
diff --git 
a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
new file mode 100644
index 0000000000..b2f5ca0597
--- /dev/null
+++ 
b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Injectable } from "@angular/core";
+import { BehaviorSubject, Observable } from "rxjs";
+import { finalize, tap } from "rxjs/operators";
+import { WorkflowExecutionsService } from 
"../../../dashboard/service/user/workflow-executions/workflow-executions.service";
+import { WorkflowCacheEntry } from 
"../../../dashboard/type/workflow-cache-entry";
+import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
+
+/**
+ * Shares workflow cache entry state across components and keeps it in sync 
with the backend.
+ */
+@Injectable({
+  providedIn: "root",
+})
+export class WorkflowCacheEntriesService {
+  private readonly cacheEntriesSubject = new 
BehaviorSubject<ReadonlyArray<WorkflowCacheEntry>>([]);
+  private readonly loadingSubject = new BehaviorSubject<boolean>(false);
+  private currentWorkflowId?: number;
+
+  constructor(
+    private workflowExecutionsService: WorkflowExecutionsService,
+    private workflowActionService: WorkflowActionService
+  ) {
+    this.workflowActionService.workflowMetaDataChanged().subscribe(metadata => 
{
+      const workflowId = metadata.wid ?? 0;
+      if (workflowId <= 0) {
+        this.currentWorkflowId = undefined;
+        this.cacheEntriesSubject.next([]);
+        return;
+      }
+      if (workflowId !== this.currentWorkflowId) {
+        this.currentWorkflowId = workflowId;
+        this.refreshCacheEntries(workflowId).subscribe();
+      }
+    });
+
+    const initialMetadata = this.workflowActionService.getWorkflowMetadata();
+    const initialWorkflowId = initialMetadata?.wid ?? 0;
+    if (initialWorkflowId > 0 && initialWorkflowId !== this.currentWorkflowId) 
{
+      this.currentWorkflowId = initialWorkflowId;
+      this.refreshCacheEntries(initialWorkflowId).subscribe();
+    }
+  }
+
+  /**
+   * Returns a stream of cache entries for the active workflow.
+   */
+  public getCacheEntriesStream(): 
Observable<ReadonlyArray<WorkflowCacheEntry>> {
+    return this.cacheEntriesSubject.asObservable();
+  }
+
+  /**
+   * Returns a stream of cache entry loading state.
+   */
+  public getLoadingStream(): Observable<boolean> {
+    return this.loadingSubject.asObservable();
+  }
+
+  /**
+   * Returns the latest cache entry snapshot.
+   */
+  public getCacheEntriesSnapshot(): ReadonlyArray<WorkflowCacheEntry> {
+    return this.cacheEntriesSubject.value;
+  }
+
+  /**
+   * Refreshes cache entries for a workflow and updates shared state.
+   *
+   * @param workflowId Workflow ID to refresh entries for
+   */
+  public refreshCacheEntries(workflowId: number): 
Observable<WorkflowCacheEntry[]> {
+    if (workflowId > 0 && workflowId !== this.currentWorkflowId) {
+      this.currentWorkflowId = workflowId;
+    }
+    this.loadingSubject.next(true);
+    return 
this.workflowExecutionsService.retrieveWorkflowCacheEntries(workflowId).pipe(
+      tap(entries => this.cacheEntriesSubject.next(entries)),
+      finalize(() => this.loadingSubject.next(false))
+    );
+  }
+
+  /**
+   * Clears all cached outputs for a workflow and updates shared state.
+   *
+   * @param workflowId Workflow ID whose cache entries should be removed
+   */
+  public clearWorkflowCacheEntries(workflowId: number): Observable<void> {
+    return 
this.workflowExecutionsService.deleteWorkflowCacheEntries(workflowId).pipe(
+      tap(() => this.cacheEntriesSubject.next([]))
+    );
+  }
+
+  /**
+   * Returns cached output port IDs for a logical operator.
+   *
+   * @param operatorId Logical operator ID
+   */
+  public getCachedOutputPorts(operatorId: string): Set<string> {
+    const cachedPorts = new Set<string>();
+    this.cacheEntriesSubject.value
+      .filter(entry => entry.logicalOpId === operatorId && !entry.internal)
+      .forEach(entry => cachedPorts.add(entry.portId.toString()));
+    return cachedPorts;
+  }
+}


Reply via email to