This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 7858f5cfc148f7a7fc6617d2aa4a180234b1a439
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Thu Jan 15 09:52:29 2026 -0800

    feat(cache): add clearing cache for a workflow.
---
 .../user/workflow/WorkflowExecutionsResource.scala | 55 +++++++++++++++++++++-
 .../web/service/OperatorPortCacheService.scala     | 50 ++++++++++++++++++--
 docs/operator-port-cache.md                        | 11 +++--
 .../workflow-executions.service.ts                 |  7 +++
 .../cache-panel/cache-panel.component.html         | 11 +++++
 .../cache-panel/cache-panel.component.ts           | 24 ++++++++++
 6 files changed, 148 insertions(+), 10 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index 52d3f43123..70fbdd1883 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -45,7 +45,11 @@ import 
org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, Us
 import org.apache.texera.web.dao.OperatorPortCacheDao
 import org.apache.texera.web.model.http.request.result.ResultExportRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
-import org.apache.texera.web.service.{ExecutionsMetadataPersistService, 
ResultExportService}
+import org.apache.texera.web.service.{
+  ExecutionsMetadataPersistService,
+  OperatorPortCacheService,
+  ResultExportService
+}
 import org.jooq.DSLContext
 import play.api.libs.json.Json
 
@@ -815,6 +819,49 @@ class WorkflowExecutionsResource {
       .toList
   }
 
+  /**
+    * Removes all cached outputs for a workflow and deletes their stored 
result documents.
+    *
+    * This also deletes operator_port_executions rows that reference the 
cached result URIs.
+    * The cleanup is delegated to OperatorPortCacheService for consistency.
+    */
+  @DELETE
+  @Path("/{wid}/cache")
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  def deleteWorkflowCacheEntries(
+      @PathParam("wid") wid: Integer,
+      @Auth sessionUser: SessionUser
+  ): Unit = {
+    clearWorkflowCacheEntriesInternal(wid, sessionUser)
+  }
+
+  /**
+    * Clears cached outputs using POST for environments that block DELETE.
+    */
+  @POST
+  @Path("/{wid}/cache/clear")
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  def clearWorkflowCacheEntries(
+      @PathParam("wid") wid: Integer,
+      @Auth sessionUser: SessionUser
+  ): Unit = {
+    clearWorkflowCacheEntriesInternal(wid, sessionUser)
+  }
+
+  /**
+    * Shared handler for cache eviction endpoints.
+    */
+  private def clearWorkflowCacheEntriesInternal(
+      wid: Integer,
+      sessionUser: SessionUser
+  ): Unit = {
+    validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid)
+
+    val dao = new OperatorPortCacheDao(SqlServer.getInstance())
+    val cacheService = new OperatorPortCacheService(dao)
+    cacheService.invalidateWorkflowCache(WorkflowIdentity(wid.toLong))
+  }
+
   /** Sets a group of executions' bookmarks to the payload passed in the body. 
*/
   @PUT
   @Consumes(Array(MediaType.APPLICATION_JSON))
@@ -850,6 +897,12 @@ class WorkflowExecutionsResource {
       throw new WebApplicationException(Response.Status.UNAUTHORIZED)
   }
 
+  /** Determine if the user is authorized to modify the workflow, if not raise 
401. */
+  private def validateUserCanWriteWorkflow(uid: Integer, wid: Integer): Unit = 
{
+    if (!WorkflowAccessResource.hasWriteAccess(wid, uid))
+      throw new WebApplicationException(Response.Status.UNAUTHORIZED)
+  }
+
   /** Delete a group of executions */
   @PUT
   @Consumes(Array(MediaType.APPLICATION_JSON))
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
index 9e0155e3a4..82aef5f4c8 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
@@ -23,9 +23,13 @@ import 
org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow
 import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
 import org.apache.texera.amber.core.workflow.{CachedOutput, 
GlobalPortIdentity, PhysicalPlan}
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_EXECUTIONS
 import org.apache.texera.web.dao.{OperatorPortCacheDao, 
OperatorPortCacheRecord}
+import org.apache.texera.amber.core.storage.DocumentFactory
 
 import java.net.URI
+import scala.jdk.CollectionConverters._
 
 /**
   * Service for operator port result caching.
@@ -40,6 +44,7 @@ import java.net.URI
   * @param dao OperatorPortCacheDao for database access
   */
 class OperatorPortCacheService(dao: OperatorPortCacheDao) {
+  private val context = SqlServer.getInstance().createDSLContext()
 
   /**
     * Lookup cached outputs for all materializable ports in the physical plan.
@@ -113,16 +118,51 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) 
{
   }
 
   /**
-    * Invalidate all cache entries for a workflow.
-    * Useful for:
-    * - Manual cache clearing via REST API
-    * - Workflow deletion (cleanup)
-    * - Testing
+    * Invalidate all cache entries for a workflow and remove associated result 
artifacts.
+    * This is used for manual cache clearing, workflow deletion, and testing.
+    *
+    * Steps:
+    * 1. Collect cached result URIs
+    * 2. Remove operator_port_executions rows referencing those URIs
+    * 3. Delete cache entries
+    * 4. Clear stored result documents (best-effort)
     *
     * @param workflowId Workflow ID whose cache entries should be deleted
     */
   def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = {
+    val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0)
+    val resultUris = cacheEntries.map(_.resultUri).distinct
+    deleteOperatorPortResultsByUris(resultUris)
     dao.deleteByWorkflow(workflowId.id)
+    clearCachedResultDocuments(resultUris)
+  }
+
+  /**
+    * Deletes operator_port_executions rows that reference the provided result 
URIs.
+    */
+  private def deleteOperatorPortResultsByUris(resultUris: Seq[URI]): Unit = {
+    if (resultUris.isEmpty) {
+      return
+    }
+    val uriStrings = resultUris.map(_.toString).distinct.asJava
+    context
+      .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+      .where(OPERATOR_PORT_EXECUTIONS.RESULT_URI.in(uriStrings))
+      .execute()
+  }
+
+  /**
+    * Best-effort deletion of stored result documents referenced by cache 
entries.
+    */
+  private def clearCachedResultDocuments(resultUris: Seq[URI]): Unit = {
+    resultUris.foreach { uri =>
+      try {
+        DocumentFactory.openDocument(uri)._1.clear()
+      } catch {
+        case _: Throwable =>
+        // Document already deleted or unavailable - safe to ignore.
+      }
+    }
   }
 
   /**
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 0d1348c740..c27a088ac7 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -190,6 +190,7 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 - Left panel "Cache" tab listing workflow cache entries (physical op id, port 
id, tuple count, source execution id, updated_at, short subdag hash)
 - Highlight cache entries usable by the current execution (fingerprint match)
 - Cache panel toggle to show only entries usable by the current execution
+- Cache panel action to clear all cached results (deletes cache entries, 
result documents, and port result records)
 - Output port labels show tuple counts, plus a second line with source 
execution id for cached outputs
 - Result URI hidden from the UI
 
@@ -257,7 +258,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
       tupleCount: Option[Long]
   ): Unit
 
-  /** Invalidate all cache entries for a workflow (lifecycle management) */
+  /** Invalidate all cache entries and cached result artifacts for a workflow 
*/
   def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit
 
   /** Future: Cost-aware eviction when storage quota exceeded */
@@ -270,7 +271,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
 - Encapsulates fingerprint computation (calls `FingerprintUtil`)
 - Handles `GlobalPortIdentity` ↔ String serialization
 - Manages tuple count propagation (best-effort via runtime stats)
-- Provides workflow-level abstractions (batch lookup, invalidation)
+- Provides workflow-level abstractions (batch lookup, invalidation + artifact 
cleanup)
 
 #### WorkflowExecutionsResource (REST API - Optional)
 
@@ -279,7 +280,8 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
 HTTP endpoints for external access:
 
 - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache 
entries (result_uri omitted)
-- (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation
+- `DELETE /executions/{workflowId}/cache`: Clear all cache entries and delete 
stored result documents
+- `POST /executions/{workflowId}/cache/clear`: POST alternative for 
environments that block DELETE
 
 **Note**: Internal services use `OperatorPortCacheService`, not the REST 
resource.
 
@@ -368,7 +370,8 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 - **Cache metadata UI** (Phase 1.3 - Complete):
   - `CacheUsageUpdateEvent` publishes cached outputs usable by the current 
execution (fingerprint match)
   - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple 
count, source execution id, updated_at, short subdag hash)
-- Cache entries highlight when usable by the current execution (fingerprint 
match)
+  - Cache entries highlight when usable by the current execution (fingerprint 
match)
+  - Cache panel "Clear cache" action removes cached results and associated 
result artifacts
 - Cache panel can filter to only show entries usable by the current execution
   - Cached output ports show source execution id on a second label line
   - REST: `GET /executions/{wid}/cache` lists cache entries (result URI 
omitted)
diff --git 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
index 7da51bb2b5..615ffefb73 100644
--- 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
+++ 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
@@ -107,4 +107,11 @@ export class WorkflowExecutionsService {
     }
     return 
this.http.get<WorkflowCacheEntry[]>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache`,
 { params });
   }
+
+  /**
+   * Deletes all cached outputs for the workflow, including stored result 
artifacts.
+   */
+  deleteWorkflowCacheEntries(wid: number): Observable<void> {
+    return 
this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/clear`, 
{});
+  }
 }
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
index ba1cddbe7a..785bdb1282 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
@@ -27,6 +27,17 @@
       (click)="refresh()">
       Refresh
     </button>
+    <button
+      nz-button
+      nzSize="small"
+      nzType="default"
+      [disabled]="loading || removing || !cacheEntries.length"
+      [nzLoading]="removing"
+      (nzOnConfirm)="clearCacheEntries()"
+      nz-popconfirm
+      nzPopconfirmTitle="Remove all cached results for this workflow?">
+      Clear cache
+    </button>
     <label class="cache-panel__toggle">
       <nz-switch
         nzSize="small"
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
index 4ed7afff98..7a2e50a426 100644
--- 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -40,6 +40,8 @@ export class CachePanelComponent implements OnInit {
   public visibleEntries: WorkflowCacheEntry[] = [];
   /** When true, only cache entries usable by the current execution are shown. 
*/
   public showUsableOnly = false;
+  /** True while the cache eviction request is in flight. */
+  public removing = false;
   public loading = false;
   private workflowId?: number;
   private usageKeys = new Set<string>();
@@ -90,6 +92,28 @@ export class CachePanelComponent implements OnInit {
       });
   }
 
+  /**
+   * Removes all cached outputs for the workflow and refreshes the list.
+   */
+  public clearCacheEntries(): void {
+    if (!this.workflowId) {
+      return;
+    }
+    this.removing = true;
+    this.workflowExecutionsService
+      .deleteWorkflowCacheEntries(this.workflowId)
+      .pipe(
+        finalize(() => {
+          this.removing = false;
+        }),
+        untilDestroyed(this)
+      )
+      .subscribe(() => {
+        this.cacheEntries = [];
+        this.updateVisibleEntries();
+      });
+  }
+
   /**
    * Returns true when a cache entry is usable by the current execution 
(fingerprint match),
    * regardless of whether the scheduler chooses to reuse it.

Reply via email to