This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit c600cc1531b1234bf723d1ed195b711df4d31ef0 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Feb 9 21:43:16 2026 -0800 feat(cache): add cache and execution resources cleanup by lifecycle manager --- .../texera/web/dao/OperatorPortCacheDao.scala | 60 ++++++++++++++++++++++ .../user/workflow/WorkflowExecutionsResource.scala | 43 ++++++++++++++-- .../web/service/OperatorPortCacheService.scala | 48 +++++++++++++++++ .../web/service/WorkflowExecutionService.scala | 16 ++++++ .../texera/web/service/WorkflowService.scala | 50 ++++++++++-------- docs/operator-port-cache.md | 9 +++- .../workflow-executions.service.ts | 11 +++- .../cache-panel/cache-panel.component.ts | 3 ++ .../compile-workflow/workflow-compiling.service.ts | 15 ++---- 9 files changed, 217 insertions(+), 38 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala index 7476a99d3c..d656ae6fea 100644 --- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala +++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala @@ -154,6 +154,47 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { .toSeq } + /** + * List cache entries produced by the specified source execution IDs. + * + * @param sourceExecutionIds execution IDs that produced cached outputs + * @return cache entries with matching source_execution_id + */ + def listBySourceExecutionIds(sourceExecutionIds: Seq[Long]): Seq[OperatorPortCacheRecord] = { + val executionIds = sourceExecutionIds.distinct + if (executionIds.isEmpty) { + return Seq.empty + } + context + .select( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH, + OPERATOR_PORT_CACHE.FINGERPRINT_JSON, + OPERATOR_PORT_CACHE.RESULT_URI, + OPERATOR_PORT_CACHE.TUPLE_COUNT, + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, + OPERATOR_PORT_CACHE.UPDATED_AT + ) + .from(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in(executionIds.map(Long.box).asJava)) + .fetch() + .asScala + .map(record => + OperatorPortCacheRecord( + workflowId = record.value1().longValue(), + globalPortId = record.value2(), + subdagHash = record.value3(), + fingerprintJson = record.value4(), + resultUri = URI.create(record.value5()), + tupleCount = Option(record.value6()).map(_.longValue()), + sourceExecutionId = Option(record.value7()).map(_.longValue()), + updatedAt = Option(record.value8()) + ) + ) + .toSeq + } + /** * Insert or update a cache entry (upsert). * On conflict (workflow_id, global_port_id, subdag_hash), updates the existing record and refreshes updated_at. @@ -200,6 +241,25 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { .execute() } + /** + * Delete cache entries by source execution IDs. + * This removes cache metadata whose backing result artifacts were produced by + * executions that are being cleaned up. + * + * @param sourceExecutionIds Execution IDs that produced the cached outputs + * @return Number of rows deleted + */ + def deleteBySourceExecutionIds(sourceExecutionIds: Seq[Long]): Int = { + val executionIds = sourceExecutionIds.distinct + if (executionIds.isEmpty) { + return 0 + } + context + .deleteFrom(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID.in(executionIds.map(Long.box).asJava)) + .execute() + } + /** * Delete cache entries for a workflow by global port IDs. * This removes all hashes for the specified output ports. diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 00a50231cf..6b0df8fb46 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -118,6 +118,26 @@ object WorkflowExecutionsResource { } } + /** + * Retrieves all execution IDs of a workflow for a computing unit, ordered by latest first. + * + * @param wid workflow id + * @param cuid computing unit id + * @return list of execution ids + */ + def getExecutionIDs(wid: Integer, cuid: Integer): List[Integer] = { + context + .select(WORKFLOW_EXECUTIONS.EID) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID)) + .where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_EXECUTIONS.CUID.eq(cuid))) + .orderBy(WORKFLOW_EXECUTIONS.EID.desc()) + .fetchInto(classOf[Integer]) + .asScala + .toList + } + /** * Computes which operators in a workflow are restricted due to dataset access controls. * @@ -380,13 +400,21 @@ object WorkflowExecutionsResource { /** * Removes all resources related to the specified execution IDs, - * including runtime statistics, console messages, result documents, and database records. + * including runtime statistics, console messages, result documents, cache metadata, + * and execution database records. * * @param eids Array of execution IDs to be cleaned up. */ def removeAllExecutionFiles(eids: Array[Integer]): Unit = { val eIdsLong = eids.map(_.toLong) val eIdsList = eIdsLong.toSeq.asJava + val executionIds = eIdsLong.toIndexedSeq.map(ExecutionIdentity(_)) + + // Remove cache entries that reference results produced by these executions. + val cacheService = new OperatorPortCacheService( + new OperatorPortCacheDao(SqlServer.getInstance()) + ) + cacheService.invalidateCacheBySourceExecutions(executionIds) // Collect all related document URIs (runtime stats, console logs, results) val uris: Seq[URI] = eIdsLong.toIndexedSeq.flatMap { eid => @@ -599,6 +627,13 @@ case class ExecutionRenameRequest(wid: Integer, eId: Integer, executionName: Str */ case class CacheEvictionRequest(logicalOpIds: List[String]) +/** + * Response payload for cache invalidation endpoints. + * + * @param removedCount Number of cache entries removed + */ +case class CacheInvalidationResponse(removedCount: Int) + @Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM, "application/zip")) @Path("/executions") class WorkflowExecutionsResource { @@ -893,7 +928,7 @@ class WorkflowExecutionsResource { request: LogicalPlanPojo, @PathParam("wid") wid: Integer, @Auth sessionUser: SessionUser - ): Unit = { + ): CacheInvalidationResponse = { validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid) val workflow = try { val workflowContext = new WorkflowContext(workflowId = WorkflowIdentity(wid.toLong)) @@ -904,7 +939,9 @@ class WorkflowExecutionsResource { } val dao = new OperatorPortCacheDao(SqlServer.getInstance()) val cacheService = new OperatorPortCacheService(dao) - cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), workflow.physicalPlan) + val removedCount = + cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), workflow.physicalPlan) + CacheInvalidationResponse(removedCount) } /** diff --git a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala index e85afef1c5..5391d6d1ee 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala @@ -49,6 +49,17 @@ import scala.util.Try class OperatorPortCacheService(dao: OperatorPortCacheDao) { private val context = SqlServer.getInstance().createDSLContext() + /** + * Result of cache invalidation by source executions. + * + * @param deletedRows number of rows removed from operator_port_cache + * @param deletedResultUris cached result URIs deleted/cleared during invalidation + */ + case class SourceExecutionInvalidationResult( + deletedRows: Int, + deletedResultUris: Set[URI] + ) + /** * Lookup cached outputs for all materializable ports in the physical plan. * Called at workflow submission time by WorkflowExecutionService. @@ -137,6 +148,43 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { deleteCacheEntriesByPorts(workflowId, cacheEntries) } + /** + * Invalidate cache entries produced by the specified executions. + * + * This removes cache metadata as well as cache-linked storage artifacts: + * - operator_port_executions rows referencing cached URIs + * - cached result documents + * + * @param executionIds Execution IDs whose produced cache entries should be deleted + * @return Number of cache rows deleted + */ + def invalidateCacheBySourceExecutions(executionIds: Seq[ExecutionIdentity]): Int = { + invalidateCacheBySourceExecutionsWithArtifacts(executionIds).deletedRows + } + + /** + * Invalidate cache entries produced by the specified executions and return deleted URI artifacts. + * + * This helper is used by lifecycle cleanup to avoid re-clearing cached documents. + * + * @param executionIds Execution IDs whose produced cache entries should be deleted + * @return deleted rows and deleted result URIs + */ + def invalidateCacheBySourceExecutionsWithArtifacts( + executionIds: Seq[ExecutionIdentity] + ): SourceExecutionInvalidationResult = { + val sourceExecutionIds = executionIds.map(_.id) + val cacheEntries = dao.listBySourceExecutionIds(sourceExecutionIds) + if (cacheEntries.isEmpty) { + return SourceExecutionInvalidationResult(0, Set.empty) + } + val resultUris = cacheEntries.map(_.resultUri).distinct + deleteOperatorPortResultsByUris(resultUris) + val deleted = dao.deleteBySourceExecutionIds(sourceExecutionIds) + clearCachedResultDocuments(resultUris) + SourceExecutionInvalidationResult(deleted, resultUris.toSet) + } + /** * Invalidate cache entries for outputs owned by the provided logical operators. * This is used by editor actions that evict cache for selected operators. diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 9a9a8a11b9..98cb7aad8b 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -57,6 +57,22 @@ object WorkflowExecutionService { .getLatestExecutionID(workflowId.id.toInt, computingUnitId) .map(eid => new ExecutionIdentity(eid.longValue())) } + + /** + * Retrieve all execution IDs for a workflow and computing unit. + * + * @param workflowId workflow identity + * @param computingUnitId computing unit id + * @return execution IDs ordered by newest first + */ + def getExecutionIds( + workflowId: WorkflowIdentity, + computingUnitId: Int + ): Seq[ExecutionIdentity] = { + WorkflowExecutionsResource + .getExecutionIDs(workflowId.id.toInt, computingUnitId) + .map(eid => new ExecutionIdentity(eid.longValue())) + } } class WorkflowExecutionService( diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 2b5abd8592..c1f7f9ad8c 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -117,12 +117,9 @@ class WorkflowService( s"workflowId=$workflowId", cleanUpTimeout, () => { - // clear the storage resources associated with the latest execution - WorkflowExecutionService - .getLatestExecutionId(workflowId, computingUnitId) - .foreach(eid => { -// clearExecutionResources(eid) - }) + // Clear execution-scoped artifacts (runtime stats, result/console docs) for all executions. + val executionIds = WorkflowExecutionService.getExecutionIds(workflowId, computingUnitId) + clearExecutionResources(executionIds) WorkflowService.workflowServiceMapping.remove(mkWorkflowStateId(workflowId)) if (executionService.getValue != null) { // shutdown client @@ -327,27 +324,40 @@ class WorkflowService( } /** - * Cleans up all resources associated with a workflow execution. + * Cleans up all resources associated with workflow executions. * * This method performs resource cleanup in the following sequence: - * 1. Retrieves all document URIs associated with the execution - * 2. Clears URI references from the execution registry - * 3. Safely clears all result and console message documents - * 4. Expires Iceberg snapshots for runtime statistics - * 5. Deletes large binaries from MinIO + * 1. Retrieves all document URIs associated with the executions + * 2. Invalidates cache entries produced by these executions (cache rows + cached docs + cache-linked operator_port_executions rows) + * 3. Clears URI references from the execution registry + * 4. Safely clears all result and console message documents + * 5. Expires Iceberg snapshots for runtime statistics + * 6. Deletes large binaries from MinIO * - * @param eid The execution identity to clean up resources for + * @param executionIds execution identities to clean up resources for */ - private def clearExecutionResources(eid: ExecutionIdentity): Unit = { - // Retrieve URIs for all resources associated with this execution - val resultUris = WorkflowExecutionsResource.getResultUrisByExecutionId(eid) - val consoleMessagesUris = WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid) + private def clearExecutionResources(executionIds: Seq[ExecutionIdentity]): Unit = { + if (executionIds.isEmpty) { + return + } + + val runtimeStatsUris = + executionIds.flatMap(eid => WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).toList) + + // Invalidate cache artifacts produced by these executions. + val cacheInvalidation = cacheService.invalidateCacheBySourceExecutionsWithArtifacts(executionIds) + + val resultUris = executionIds + .flatMap(WorkflowExecutionsResource.getResultUrisByExecutionId) + .filterNot(cacheInvalidation.deletedResultUris.contains) + val consoleMessagesUris = + executionIds.flatMap(WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId) // Remove references from registry first - WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris(eid) + executionIds.foreach(WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris) // Clean up all result and console message documents - (resultUris ++ consoleMessagesUris).foreach { uri => + (resultUris ++ consoleMessagesUris).distinct.foreach { uri => try DocumentFactory.openDocument(uri)._1.clear() catch { case error: Throwable => @@ -356,7 +366,7 @@ class WorkflowService( } // Expire any Iceberg snapshots for runtime statistics - WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(eid).foreach { uri => + runtimeStatsUris.distinct.foreach { uri => try { DocumentFactory.openDocument(uri)._1 match { case iceberg: OnIceberg => iceberg.expireSnapshots() diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 7e4c27defd..56eaf5a3f7 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -214,7 +214,11 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Existing protobuf messages (`OperatorStatistics`) handle cached regions naturally - No `from_cache` flag required (can infer from numWorkers=0 + instant completion) -- Lifecycle management (eviction, cleanup) deferred to future work +- Cost-aware lifecycle management (eviction policy) deferred to future work +- Lifecycle timeout cleanup clears execution-scoped artifacts (result/console/runtime stats) for all executions of the workflow on the current computing unit +- Lifecycle timeout cleanup clears cached artifacts produced by those executions via `clearExecutionResources` (single cleanup path), including cache rows, associated `operator_port_executions` rows, and cached result documents +- Lifecycle timeout cleanup excludes cached result URIs already removed by cache invalidation to avoid double-clearing documents +- Bulk execution deletion cleanup removes cache metadata rows by `source_execution_id` to avoid dangling cache entries after execution artifacts are deleted **Cache metadata UI (Implemented)**: @@ -396,6 +400,9 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - `OperatorPortCacheService`: High-level cache operations (lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache) - `ExecutionCacheService`: Event listener bridging controller events to service layer - Event-based communication via `PortMaterialized` event and `client.registerCallback[T]` + - Execution cleanup integration: + - lifecycle timeout (cuid-scoped) clears cache artifacts via `invalidateCacheBySourceExecutions` + - bulk execution deletion removes cache artifacts via `invalidateCacheBySourceExecutions` - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs for deterministic subDAG hashing - **Submission-time lookup**: `WorkflowExecutionService` uses `OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event → `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → `OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count) diff --git a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts index a5fe110d96..c787a7fb51 100644 --- a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts +++ b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts @@ -28,6 +28,10 @@ import { ExecutionState, LogicalPlan } from "../../../../workspace/types/execute export const WORKFLOW_EXECUTIONS_API_BASE_URL = `${AppSettings.getApiEndpoint()}/executions`; +export type CacheInvalidationResponse = Readonly<{ + removedCount: number; +}>; + @Injectable({ providedIn: "root", }) @@ -127,7 +131,10 @@ export class WorkflowExecutionsService { /** * Invalidates cache entries whose fingerprints do not match the provided logical plan. */ - invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): Observable<void> { - return this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`, logicalPlan); + invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): Observable<CacheInvalidationResponse | null> { + return this.http.post<CacheInvalidationResponse | null>( + `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`, + logicalPlan + ); } } diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index f44d48f6b9..5e9e0b9d18 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -71,6 +71,9 @@ export class CachePanelComponent implements OnInit { return; } this.workflowId = workflowId; + // Invalidation notices are compile-time, session-local feedback. + // Clear stale notices when opening/reloading the workflow cache panel. + this.cacheEntriesService.clearInvalidationNotice(); this.cacheEntriesService .getCacheEntriesStream() .pipe(untilDestroyed(this)) diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts index d2beccb2ee..b43cca630e 100644 --- a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts @@ -337,9 +337,6 @@ export class WorkflowCompilingService { if (!this.cacheEntriesService.isAutoInvalidationEnabled()) { return; } - const beforeKeys = new Set( - this.cacheEntriesService.getCacheEntriesSnapshot().map(entry => this.cacheEntriesService.buildEntryKey(entry)) - ); this.workflowExecutionsService .invalidateWorkflowCacheEntries(workflowId, logicalPlan) .pipe( @@ -348,15 +345,9 @@ export class WorkflowCompilingService { return EMPTY; }) ) - .subscribe(() => { - this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(entries => { - const afterKeys = new Set(entries.map(entry => this.cacheEntriesService.buildEntryKey(entry))); - let removedCount = 0; - beforeKeys.forEach(key => { - if (!afterKeys.has(key)) { - removedCount += 1; - } - }); + .subscribe(response => { + const removedCount = response?.removedCount ?? 0; + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(() => { this.cacheEntriesService.notifyInvalidation(workflowId, removedCount); }); });
