This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7858f5cfc148f7a7fc6617d2aa4a180234b1a439 Author: Xiaozhen Liu <[email protected]> AuthorDate: Thu Jan 15 09:52:29 2026 -0800 feat(cache): add clearing cache for a workflow. --- .../user/workflow/WorkflowExecutionsResource.scala | 55 +++++++++++++++++++++- .../web/service/OperatorPortCacheService.scala | 50 ++++++++++++++++++-- docs/operator-port-cache.md | 11 +++-- .../workflow-executions.service.ts | 7 +++ .../cache-panel/cache-panel.component.html | 11 +++++ .../cache-panel/cache-panel.component.ts | 24 ++++++++++ 6 files changed, 148 insertions(+), 10 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 52d3f43123..70fbdd1883 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -45,7 +45,11 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, Us import org.apache.texera.web.dao.OperatorPortCacheDao import org.apache.texera.web.model.http.request.result.ResultExportRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ -import org.apache.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService} +import org.apache.texera.web.service.{ + ExecutionsMetadataPersistService, + OperatorPortCacheService, + ResultExportService +} import org.jooq.DSLContext import play.api.libs.json.Json @@ -815,6 +819,49 @@ class WorkflowExecutionsResource { .toList } + /** + * Removes all cached outputs for a workflow and deletes their stored result documents. + * + * This also deletes operator_port_executions rows that reference the cached result URIs. + * The cleanup is delegated to OperatorPortCacheService for consistency. + */ + @DELETE + @Path("/{wid}/cache") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def deleteWorkflowCacheEntries( + @PathParam("wid") wid: Integer, + @Auth sessionUser: SessionUser + ): Unit = { + clearWorkflowCacheEntriesInternal(wid, sessionUser) + } + + /** + * Clears cached outputs using POST for environments that block DELETE. + */ + @POST + @Path("/{wid}/cache/clear") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def clearWorkflowCacheEntries( + @PathParam("wid") wid: Integer, + @Auth sessionUser: SessionUser + ): Unit = { + clearWorkflowCacheEntriesInternal(wid, sessionUser) + } + + /** + * Shared handler for cache eviction endpoints. + */ + private def clearWorkflowCacheEntriesInternal( + wid: Integer, + sessionUser: SessionUser + ): Unit = { + validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid) + + val dao = new OperatorPortCacheDao(SqlServer.getInstance()) + val cacheService = new OperatorPortCacheService(dao) + cacheService.invalidateWorkflowCache(WorkflowIdentity(wid.toLong)) + } + /** Sets a group of executions' bookmarks to the payload passed in the body. */ @PUT @Consumes(Array(MediaType.APPLICATION_JSON)) @@ -850,6 +897,12 @@ class WorkflowExecutionsResource { throw new WebApplicationException(Response.Status.UNAUTHORIZED) } + /** Determine if the user is authorized to modify the workflow, if not raise 401. */ + private def validateUserCanWriteWorkflow(uid: Integer, wid: Integer): Unit = { + if (!WorkflowAccessResource.hasWriteAccess(wid, uid)) + throw new WebApplicationException(Response.Status.UNAUTHORIZED) + } + /** Delete a group of executions */ @PUT @Consumes(Array(MediaType.APPLICATION_JSON)) diff --git a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala index 9e0155e3a4..82aef5f4c8 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala @@ -23,9 +23,13 @@ import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow import org.apache.texera.amber.core.workflow.cache.FingerprintUtil import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity, PhysicalPlan} import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_EXECUTIONS import org.apache.texera.web.dao.{OperatorPortCacheDao, OperatorPortCacheRecord} +import org.apache.texera.amber.core.storage.DocumentFactory import java.net.URI +import scala.jdk.CollectionConverters._ /** * Service for operator port result caching. @@ -40,6 +44,7 @@ import java.net.URI * @param dao OperatorPortCacheDao for database access */ class OperatorPortCacheService(dao: OperatorPortCacheDao) { + private val context = SqlServer.getInstance().createDSLContext() /** * Lookup cached outputs for all materializable ports in the physical plan. @@ -113,16 +118,51 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { } /** - * Invalidate all cache entries for a workflow. - * Useful for: - * - Manual cache clearing via REST API - * - Workflow deletion (cleanup) - * - Testing + * Invalidate all cache entries for a workflow and remove associated result artifacts. + * This is used for manual cache clearing, workflow deletion, and testing. + * + * Steps: + * 1. Collect cached result URIs + * 2. Remove operator_port_executions rows referencing those URIs + * 3. Delete cache entries + * 4. Clear stored result documents (best-effort) * * @param workflowId Workflow ID whose cache entries should be deleted */ def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = { + val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0) + val resultUris = cacheEntries.map(_.resultUri).distinct + deleteOperatorPortResultsByUris(resultUris) dao.deleteByWorkflow(workflowId.id) + clearCachedResultDocuments(resultUris) + } + + /** + * Deletes operator_port_executions rows that reference the provided result URIs. + */ + private def deleteOperatorPortResultsByUris(resultUris: Seq[URI]): Unit = { + if (resultUris.isEmpty) { + return + } + val uriStrings = resultUris.map(_.toString).distinct.asJava + context + .deleteFrom(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.RESULT_URI.in(uriStrings)) + .execute() + } + + /** + * Best-effort deletion of stored result documents referenced by cache entries. + */ + private def clearCachedResultDocuments(resultUris: Seq[URI]): Unit = { + resultUris.foreach { uri => + try { + DocumentFactory.openDocument(uri)._1.clear() + } catch { + case _: Throwable => + // Document already deleted or unavailable - safe to ignore. + } + } } /** diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 0d1348c740..c27a088ac7 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -190,6 +190,7 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) - Highlight cache entries usable by the current execution (fingerprint match) - Cache panel toggle to show only entries usable by the current execution +- Cache panel action to clear all cached results (deletes cache entries, result documents, and port result records) - Output port labels show tuple counts, plus a second line with source execution id for cached outputs - Result URI hidden from the UI @@ -257,7 +258,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { tupleCount: Option[Long] ): Unit - /** Invalidate all cache entries for a workflow (lifecycle management) */ + /** Invalidate all cache entries and cached result artifacts for a workflow */ def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit /** Future: Cost-aware eviction when storage quota exceeded */ @@ -270,7 +271,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { - Encapsulates fingerprint computation (calls `FingerprintUtil`) - Handles `GlobalPortIdentity` ↔ String serialization - Manages tuple count propagation (best-effort via runtime stats) -- Provides workflow-level abstractions (batch lookup, invalidation) +- Provides workflow-level abstractions (batch lookup, invalidation + artifact cleanup) #### WorkflowExecutionsResource (REST API - Optional) @@ -279,7 +280,8 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { HTTP endpoints for external access: - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache entries (result_uri omitted) -- (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation +- `DELETE /executions/{workflowId}/cache`: Clear all cache entries and delete stored result documents +- `POST /executions/{workflowId}/cache/clear`: POST alternative for environments that block DELETE **Note**: Internal services use `OperatorPortCacheService`, not the REST resource. @@ -368,7 +370,8 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - **Cache metadata UI** (Phase 1.3 - Complete): - `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) -- Cache entries highlight when usable by the current execution (fingerprint match) + - Cache entries highlight when usable by the current execution (fingerprint match) + - Cache panel "Clear cache" action removes cached results and associated result artifacts - Cache panel can filter to only show entries usable by the current execution - Cached output ports show source execution id on a second label line - REST: `GET /executions/{wid}/cache` lists cache entries (result URI omitted) diff --git a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts index 7da51bb2b5..615ffefb73 100644 --- a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts +++ b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts @@ -107,4 +107,11 @@ export class WorkflowExecutionsService { } return this.http.get<WorkflowCacheEntry[]>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache`, { params }); } + + /** + * Deletes all cached outputs for the workflow, including stored result artifacts. + */ + deleteWorkflowCacheEntries(wid: number): Observable<void> { + return this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/clear`, {}); + } } diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html index ba1cddbe7a..785bdb1282 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html @@ -27,6 +27,17 @@ (click)="refresh()"> Refresh </button> + <button + nz-button + nzSize="small" + nzType="default" + [disabled]="loading || removing || !cacheEntries.length" + [nzLoading]="removing" + (nzOnConfirm)="clearCacheEntries()" + nz-popconfirm + nzPopconfirmTitle="Remove all cached results for this workflow?"> + Clear cache + </button> <label class="cache-panel__toggle"> <nz-switch nzSize="small" diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index 4ed7afff98..7a2e50a426 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -40,6 +40,8 @@ export class CachePanelComponent implements OnInit { public visibleEntries: WorkflowCacheEntry[] = []; /** When true, only cache entries usable by the current execution are shown. */ public showUsableOnly = false; + /** True while the cache eviction request is in flight. */ + public removing = false; public loading = false; private workflowId?: number; private usageKeys = new Set<string>(); @@ -90,6 +92,28 @@ export class CachePanelComponent implements OnInit { }); } + /** + * Removes all cached outputs for the workflow and refreshes the list. + */ + public clearCacheEntries(): void { + if (!this.workflowId) { + return; + } + this.removing = true; + this.workflowExecutionsService + .deleteWorkflowCacheEntries(this.workflowId) + .pipe( + finalize(() => { + this.removing = false; + }), + untilDestroyed(this) + ) + .subscribe(() => { + this.cacheEntries = []; + this.updateVisibleEntries(); + }); + } + /** * Returns true when a cache entry is usable by the current execution (fingerprint match), * regardless of whether the scheduler chooses to reuse it.
