This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit c600cc1531b1234bf723d1ed195b711df4d31ef0
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Feb 9 21:43:16 2026 -0800

    feat(cache): add cache and execution resources cleanup by lifecycle manager
---
 .../texera/web/dao/OperatorPortCacheDao.scala      | 60 ++++++++++++++++++++++
 .../user/workflow/WorkflowExecutionsResource.scala | 43 ++++++++++++++--
 .../web/service/OperatorPortCacheService.scala     | 48 +++++++++++++++++
 .../web/service/WorkflowExecutionService.scala     | 16 ++++++
 .../texera/web/service/WorkflowService.scala       | 50 ++++++++++--------
 docs/operator-port-cache.md                        |  9 +++-
 .../workflow-executions.service.ts                 | 11 +++-
 .../cache-panel/cache-panel.component.ts           |  3 ++
 .../compile-workflow/workflow-compiling.service.ts | 15 ++----
 9 files changed, 217 insertions(+), 38 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala 
b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
index 7476a99d3c..d656ae6fea 100644
--- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
+++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
@@ -154,6 +154,47 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
       .toSeq
   }
 
+  /**
+    * List cache entries produced by the specified source execution IDs.
+    *
+    * @param sourceExecutionIds execution IDs that produced cached outputs
+    * @return cache entries with matching source_execution_id
+    */
+  def listBySourceExecutionIds(sourceExecutionIds: Seq[Long]): 
Seq[OperatorPortCacheRecord] = {
+    val executionIds = sourceExecutionIds.distinct
+    if (executionIds.isEmpty) {
+      return Seq.empty
+    }
+    context
+      .select(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH,
+        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
+        OPERATOR_PORT_CACHE.RESULT_URI,
+        OPERATOR_PORT_CACHE.TUPLE_COUNT,
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID,
+        OPERATOR_PORT_CACHE.UPDATED_AT
+      )
+      .from(OPERATOR_PORT_CACHE)
+      
.where(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in(executionIds.map(Long.box).asJava))
+      .fetch()
+      .asScala
+      .map(record =>
+        OperatorPortCacheRecord(
+          workflowId = record.value1().longValue(),
+          globalPortId = record.value2(),
+          subdagHash = record.value3(),
+          fingerprintJson = record.value4(),
+          resultUri = URI.create(record.value5()),
+          tupleCount = Option(record.value6()).map(_.longValue()),
+          sourceExecutionId = Option(record.value7()).map(_.longValue()),
+          updatedAt = Option(record.value8())
+        )
+      )
+      .toSeq
+  }
+
   /**
     * Insert or update a cache entry (upsert).
     * On conflict (workflow_id, global_port_id, subdag_hash), updates the 
existing record and refreshes updated_at.
@@ -200,6 +241,25 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
       .execute()
   }
 
+  /**
+    * Delete cache entries by source execution IDs.
+    * This removes cache metadata whose backing result artifacts were produced 
by
+    * executions that are being cleaned up.
+    *
+    * @param sourceExecutionIds Execution IDs that produced the cached outputs
+    * @return Number of rows deleted
+    */
+  def deleteBySourceExecutionIds(sourceExecutionIds: Seq[Long]): Int = {
+    val executionIds = sourceExecutionIds.distinct
+    if (executionIds.isEmpty) {
+      return 0
+    }
+    context
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      
.where(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in(executionIds.map(Long.box).asJava))
+      .execute()
+  }
+
   /**
     * Delete cache entries for a workflow by global port IDs.
     * This removes all hashes for the specified output ports.
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index 00a50231cf..6b0df8fb46 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -118,6 +118,26 @@ object WorkflowExecutionsResource {
     }
   }
 
+  /**
+    * Retrieves all execution IDs of a workflow for a computing unit, ordered 
by latest first.
+    *
+    * @param wid workflow id
+    * @param cuid computing unit id
+    * @return list of execution ids
+    */
+  def getExecutionIDs(wid: Integer, cuid: Integer): List[Integer] = {
+    context
+      .select(WORKFLOW_EXECUTIONS.EID)
+      .from(WORKFLOW_EXECUTIONS)
+      .join(WORKFLOW_VERSION)
+      .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
+      
.where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_EXECUTIONS.CUID.eq(cuid)))
+      .orderBy(WORKFLOW_EXECUTIONS.EID.desc())
+      .fetchInto(classOf[Integer])
+      .asScala
+      .toList
+  }
+
   /**
     * Computes which operators in a workflow are restricted due to dataset 
access controls.
     *
@@ -380,13 +400,21 @@ object WorkflowExecutionsResource {
 
   /**
     * Removes all resources related to the specified execution IDs,
-    * including runtime statistics, console messages, result documents, and 
database records.
+    * including runtime statistics, console messages, result documents, cache 
metadata,
+    * and execution database records.
     *
     * @param eids Array of execution IDs to be cleaned up.
     */
   def removeAllExecutionFiles(eids: Array[Integer]): Unit = {
     val eIdsLong = eids.map(_.toLong)
     val eIdsList = eIdsLong.toSeq.asJava
+    val executionIds = eIdsLong.toIndexedSeq.map(ExecutionIdentity(_))
+
+    // Remove cache entries that reference results produced by these 
executions.
+    val cacheService = new OperatorPortCacheService(
+      new OperatorPortCacheDao(SqlServer.getInstance())
+    )
+    cacheService.invalidateCacheBySourceExecutions(executionIds)
 
     // Collect all related document URIs (runtime stats, console logs, results)
     val uris: Seq[URI] = eIdsLong.toIndexedSeq.flatMap { eid =>
@@ -599,6 +627,13 @@ case class ExecutionRenameRequest(wid: Integer, eId: 
Integer, executionName: Str
   */
 case class CacheEvictionRequest(logicalOpIds: List[String])
 
+/**
+  * Response payload for cache invalidation endpoints.
+  *
+  * @param removedCount Number of cache entries removed
+  */
+case class CacheInvalidationResponse(removedCount: Int)
+
 @Produces(Array(MediaType.APPLICATION_JSON, 
MediaType.APPLICATION_OCTET_STREAM, "application/zip"))
 @Path("/executions")
 class WorkflowExecutionsResource {
@@ -893,7 +928,7 @@ class WorkflowExecutionsResource {
       request: LogicalPlanPojo,
       @PathParam("wid") wid: Integer,
       @Auth sessionUser: SessionUser
-  ): Unit = {
+  ): CacheInvalidationResponse = {
     validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid)
     val workflow = try {
       val workflowContext = new WorkflowContext(workflowId = 
WorkflowIdentity(wid.toLong))
@@ -904,7 +939,9 @@ class WorkflowExecutionsResource {
     }
     val dao = new OperatorPortCacheDao(SqlServer.getInstance())
     val cacheService = new OperatorPortCacheService(dao)
-    
cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), 
workflow.physicalPlan)
+    val removedCount =
+      
cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), 
workflow.physicalPlan)
+    CacheInvalidationResponse(removedCount)
   }
 
   /**
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
index e85afef1c5..5391d6d1ee 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
@@ -49,6 +49,17 @@ import scala.util.Try
 class OperatorPortCacheService(dao: OperatorPortCacheDao) {
   private val context = SqlServer.getInstance().createDSLContext()
 
+  /**
+    * Result of cache invalidation by source executions.
+    *
+    * @param deletedRows number of rows removed from operator_port_cache
+    * @param deletedResultUris cached result URIs deleted/cleared during 
invalidation
+    */
+  case class SourceExecutionInvalidationResult(
+      deletedRows: Int,
+      deletedResultUris: Set[URI]
+  )
+
   /**
     * Lookup cached outputs for all materializable ports in the physical plan.
     * Called at workflow submission time by WorkflowExecutionService.
@@ -137,6 +148,43 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
     deleteCacheEntriesByPorts(workflowId, cacheEntries)
   }
 
+  /**
+    * Invalidate cache entries produced by the specified executions.
+    *
+    * This removes cache metadata as well as cache-linked storage artifacts:
+    * - operator_port_executions rows referencing cached URIs
+    * - cached result documents
+    *
+    * @param executionIds Execution IDs whose produced cache entries should be 
deleted
+    * @return Number of cache rows deleted
+    */
+  def invalidateCacheBySourceExecutions(executionIds: Seq[ExecutionIdentity]): 
Int = {
+    invalidateCacheBySourceExecutionsWithArtifacts(executionIds).deletedRows
+  }
+
+  /**
+    * Invalidate cache entries produced by the specified executions and return 
deleted URI artifacts.
+    *
+    * This helper is used by lifecycle cleanup to avoid re-clearing cached 
documents.
+    *
+    * @param executionIds Execution IDs whose produced cache entries should be 
deleted
+    * @return deleted rows and deleted result URIs
+    */
+  def invalidateCacheBySourceExecutionsWithArtifacts(
+      executionIds: Seq[ExecutionIdentity]
+  ): SourceExecutionInvalidationResult = {
+    val sourceExecutionIds = executionIds.map(_.id)
+    val cacheEntries = dao.listBySourceExecutionIds(sourceExecutionIds)
+    if (cacheEntries.isEmpty) {
+      return SourceExecutionInvalidationResult(0, Set.empty)
+    }
+    val resultUris = cacheEntries.map(_.resultUri).distinct
+    deleteOperatorPortResultsByUris(resultUris)
+    val deleted = dao.deleteBySourceExecutionIds(sourceExecutionIds)
+    clearCachedResultDocuments(resultUris)
+    SourceExecutionInvalidationResult(deleted, resultUris.toSet)
+  }
+
   /**
     * Invalidate cache entries for outputs owned by the provided logical 
operators.
     * This is used by editor actions that evict cache for selected operators.
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index 9a9a8a11b9..98cb7aad8b 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -57,6 +57,22 @@ object WorkflowExecutionService {
       .getLatestExecutionID(workflowId.id.toInt, computingUnitId)
       .map(eid => new ExecutionIdentity(eid.longValue()))
   }
+
+  /**
+    * Retrieve all execution IDs for a workflow and computing unit.
+    *
+    * @param workflowId workflow identity
+    * @param computingUnitId computing unit id
+    * @return execution IDs ordered by newest first
+    */
+  def getExecutionIds(
+      workflowId: WorkflowIdentity,
+      computingUnitId: Int
+  ): Seq[ExecutionIdentity] = {
+    WorkflowExecutionsResource
+      .getExecutionIDs(workflowId.id.toInt, computingUnitId)
+      .map(eid => new ExecutionIdentity(eid.longValue()))
+  }
 }
 
 class WorkflowExecutionService(
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 2b5abd8592..c1f7f9ad8c 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -117,12 +117,9 @@ class WorkflowService(
     s"workflowId=$workflowId",
     cleanUpTimeout,
     () => {
-      // clear the storage resources associated with the latest execution
-      WorkflowExecutionService
-        .getLatestExecutionId(workflowId, computingUnitId)
-        .foreach(eid => {
-//          clearExecutionResources(eid)
-        })
+      // Clear execution-scoped artifacts (runtime stats, result/console docs) 
for all executions.
+      val executionIds = WorkflowExecutionService.getExecutionIds(workflowId, 
computingUnitId)
+      clearExecutionResources(executionIds)
       
WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId))
       if (executionService.getValue != null) {
         // shutdown client
@@ -327,27 +324,40 @@ class WorkflowService(
   }
 
   /**
-    * Cleans up all resources associated with a workflow execution.
+    * Cleans up all resources associated with workflow executions.
     *
     * This method performs resource cleanup in the following sequence:
-    *  1. Retrieves all document URIs associated with the execution
-    *  2. Clears URI references from the execution registry
-    *  3. Safely clears all result and console message documents
-    *  4. Expires Iceberg snapshots for runtime statistics
-    *  5. Deletes large binaries from MinIO
+    *  1. Retrieves all document URIs associated with the executions
+    *  2. Invalidates cache entries produced by these executions (cache rows + 
cached docs + cache-linked operator_port_executions rows)
+    *  3. Clears URI references from the execution registry
+    *  4. Safely clears all result and console message documents
+    *  5. Expires Iceberg snapshots for runtime statistics
+    *  6. Deletes large binaries from MinIO
     *
-    * @param eid The execution identity to clean up resources for
+    * @param executionIds execution identities to clean up resources for
     */
-  private def clearExecutionResources(eid: ExecutionIdentity): Unit = {
-    // Retrieve URIs for all resources associated with this execution
-    val resultUris = WorkflowExecutionsResource.getResultUrisByExecutionId(eid)
-    val consoleMessagesUris = 
WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid)
+  private def clearExecutionResources(executionIds: Seq[ExecutionIdentity]): 
Unit = {
+    if (executionIds.isEmpty) {
+      return
+    }
+
+    val runtimeStatsUris =
+      executionIds.flatMap(eid => 
WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).toList)
+
+    // Invalidate cache artifacts produced by these executions.
+    val cacheInvalidation = 
cacheService.invalidateCacheBySourceExecutionsWithArtifacts(executionIds)
+
+    val resultUris = executionIds
+      .flatMap(WorkflowExecutionsResource.getResultUrisByExecutionId)
+      .filterNot(cacheInvalidation.deletedResultUris.contains)
+    val consoleMessagesUris =
+      
executionIds.flatMap(WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId)
 
     // Remove references from registry first
-    WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris(eid)
+    
executionIds.foreach(WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris)
 
     // Clean up all result and console message documents
-    (resultUris ++ consoleMessagesUris).foreach { uri =>
+    (resultUris ++ consoleMessagesUris).distinct.foreach { uri =>
       try DocumentFactory.openDocument(uri)._1.clear()
       catch {
         case error: Throwable =>
@@ -356,7 +366,7 @@ class WorkflowService(
     }
 
     // Expire any Iceberg snapshots for runtime statistics
-    WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).foreach { 
uri =>
+    runtimeStatsUris.distinct.foreach { uri =>
       try {
         DocumentFactory.openDocument(uri)._1 match {
           case iceberg: OnIceberg => iceberg.expireSnapshots()
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 7e4c27defd..56eaf5a3f7 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -214,7 +214,11 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 
 - Existing protobuf messages (`OperatorStatistics`) handle cached regions 
naturally
 - No `from_cache` flag required (can infer from numWorkers=0 + instant 
completion)
-- Lifecycle management (eviction, cleanup) deferred to future work
+- Cost-aware lifecycle management (eviction policy) deferred to future work
+- Lifecycle timeout cleanup clears execution-scoped artifacts 
(result/console/runtime stats) for all executions of the workflow on the 
current computing unit
+- Lifecycle timeout cleanup clears cached artifacts produced by those 
executions via `clearExecutionResources` (single cleanup path), including cache 
rows, associated `operator_port_executions` rows, and cached result documents
+- Lifecycle timeout cleanup excludes cached result URIs already removed by 
cache invalidation to avoid double-clearing documents
+- Bulk execution deletion cleanup removes cache metadata rows by 
`source_execution_id` to avoid dangling cache entries after execution artifacts 
are deleted
 
 **Cache metadata UI (Implemented)**:
 
@@ -396,6 +400,9 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - `OperatorPortCacheService`: High-level cache operations 
(lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache)
   - `ExecutionCacheService`: Event listener bridging controller events to 
service layer
   - Event-based communication via `PortMaterialized` event and 
`client.registerCallback[T]`
+  - Execution cleanup integration:
+    - lifecycle timeout (cuid-scoped) clears cache artifacts via 
`invalidateCacheBySourceExecutions`
+    - bulk execution deletion removes cache artifacts via 
`invalidateCacheBySourceExecutions`
 - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs 
for deterministic subDAG hashing
 - **Submission-time lookup**: `WorkflowExecutionService` uses 
`OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for 
all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
 - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event 
→ `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → 
`OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count)
diff --git 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
index a5fe110d96..c787a7fb51 100644
--- 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
+++ 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
@@ -28,6 +28,10 @@ import { ExecutionState, LogicalPlan } from 
"../../../../workspace/types/execute
 
 export const WORKFLOW_EXECUTIONS_API_BASE_URL = 
`${AppSettings.getApiEndpoint()}/executions`;
 
+export type CacheInvalidationResponse = Readonly<{
+  removedCount: number;
+}>;
+
 @Injectable({
   providedIn: "root",
 })
@@ -127,7 +131,10 @@ export class WorkflowExecutionsService {
   /**
    * Invalidates cache entries whose fingerprints do not match the provided 
logical plan.
    */
-  invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): 
Observable<void> {
-    return 
this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`,
 logicalPlan);
+  invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): 
Observable<CacheInvalidationResponse | null> {
+    return this.http.post<CacheInvalidationResponse | null>(
+      `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`,
+      logicalPlan
+    );
   }
 }
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index f44d48f6b9..5e9e0b9d18 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -71,6 +71,9 @@ export class CachePanelComponent implements OnInit {
       return;
     }
     this.workflowId = workflowId;
+    // Invalidation notices are compile-time, session-local feedback.
+    // Clear stale notices when opening/reloading the workflow cache panel.
+    this.cacheEntriesService.clearInvalidationNotice();
     this.cacheEntriesService
       .getCacheEntriesStream()
       .pipe(untilDestroyed(this))
diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index d2beccb2ee..b43cca630e 100644
--- 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -337,9 +337,6 @@ export class WorkflowCompilingService {
     if (!this.cacheEntriesService.isAutoInvalidationEnabled()) {
       return;
     }
-    const beforeKeys = new Set(
-      this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => 
this.cacheEntriesService.buildEntryKey(entry))
-    );
     this.workflowExecutionsService
       .invalidateWorkflowCacheEntries(workflowId, logicalPlan)
       .pipe(
@@ -348,15 +345,9 @@ export class WorkflowCompilingService {
           return EMPTY;
         })
       )
-      .subscribe(() => {
-        
this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => {
-          const afterKeys = new Set(entries.map(entry => 
this.cacheEntriesService.buildEntryKey(entry)));
-          let removedCount = 0;
-          beforeKeys.forEach(key => {
-            if (!afterKeys.has(key)) {
-              removedCount += 1;
-            }
-          });
+      .subscribe(response => {
+        const removedCount = response?.removedCount ?? 0;
+        this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(() 
=> {
           this.cacheEntriesService.notifyInvalidation(workflowId, 
removedCount);
         });
       });

Reply via email to