This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 6201c9e742a3b5e4da037ceb7e1e4fcd75387391 Author: Xiaozhen Liu <[email protected]> AuthorDate: Fri Jan 16 17:38:29 2026 -0800 feat(cache): add cache invalidation, clear cache on operator, and highlighting of cached results. --- .../texera/web/dao/OperatorPortCacheDao.scala | 48 ++++++++ .../user/workflow/WorkflowExecutionsResource.scala | 60 +++++++++- .../web/service/OperatorPortCacheService.scala | 95 +++++++++++++++- docs/operator-port-cache.md | 20 +++- .../workflow-executions.service.ts | 18 ++- .../cache-panel/cache-panel.component.ts | 42 ++++--- .../context-menu/context-menu.component.html | 26 +++++ .../context-menu/context-menu.component.ts | 74 ++++++++++++- .../workflow-editor/workflow-editor.component.ts | 51 +++++++++ .../compile-workflow/workflow-compiling.service.ts | 43 ++++++- .../workspace/service/joint-ui/joint-ui.service.ts | 38 +++++++ .../workflow-cache-entries.service.ts | 123 +++++++++++++++++++++ 12 files changed, 603 insertions(+), 35 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala index c1af475b3a..a2e2591802 100644 --- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala +++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala @@ -22,6 +22,7 @@ package org.apache.texera.web.dao import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE import org.jooq.DSLContext +import org.jooq.impl.DSL import java.net.URI import java.time.OffsetDateTime @@ -197,4 +198,51 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) .execute() } + + /** + * Delete cache entries for a workflow by global port IDs. + * This removes all hashes for the specified output ports. + * + * @param workflowId Workflow ID whose cache entries should be deleted + * @param globalPortIds Serialized GlobalPortIdentity values + * @return Number of rows deleted + */ + def deleteByGlobalPortIds(workflowId: Long, globalPortIds: Seq[String]): Int = { + val distinctPorts = globalPortIds.distinct + if (distinctPorts.isEmpty) { + return 0 + } + context + .deleteFrom(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.in(distinctPorts.asJava)) + .execute() + } + + /** + * Delete cache entries for a workflow by (global_port_id, subdag_hash) pairs. + * This removes only the specified fingerprint versions. + * + * @param workflowId Workflow ID whose cache entries should be deleted + * @param portHashes Sequence of (globalPortId, subdagHash) to remove + * @return Number of rows deleted + */ + def deleteByGlobalPortAndHashes( + workflowId: Long, + portHashes: Seq[(String, String)] + ): Int = { + if (portHashes.isEmpty) { + return 0 + } + val conditions = portHashes.map { case (portId, subdagHash) => + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID + .eq(portId) + .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) + } + context + .deleteFrom(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .and(DSL.or(conditions.asJava)) + .execute() + } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 70fbdd1883..b8bf13ec67 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -28,7 +28,7 @@ import org.apache.texera.amber.core.storage.{ } import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity._ -import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} +import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} import org.apache.texera.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord} import org.apache.texera.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState} import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage @@ -44,6 +44,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao import org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, User => UserPojo} import org.apache.texera.web.dao.OperatorPortCacheDao import org.apache.texera.web.model.http.request.result.ResultExportRequest +import org.apache.texera.web.model.websocket.request.LogicalPlanPojo import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import org.apache.texera.web.service.{ ExecutionsMetadataPersistService, @@ -52,6 +53,7 @@ import org.apache.texera.web.service.{ } import org.jooq.DSLContext import play.api.libs.json.Json +import org.apache.texera.workflow.WorkflowCompiler import java.net.URI import java.sql.Timestamp @@ -589,6 +591,13 @@ case class ExecutionGroupDeleteRequest(wid: Integer, eIds: Array[Integer]) case class ExecutionRenameRequest(wid: Integer, eId: Integer, executionName: String) +/** + * Request payload for evicting cache entries owned by specific logical operators. + * + * @param logicalOpIds Logical operator IDs whose output caches should be removed + */ +case class CacheEvictionRequest(logicalOpIds: List[String]) + @Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM, "application/zip")) @Path("/executions") class WorkflowExecutionsResource { @@ -848,6 +857,55 @@ class WorkflowExecutionsResource { clearWorkflowCacheEntriesInternal(wid, sessionUser) } + /** + * Evicts cache entries for the specified logical operators. + * This removes all cached outputs produced by those operators, regardless of fingerprint. + */ + @POST + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Path("/{wid}/cache/evict") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def evictWorkflowCacheEntries( + request: CacheEvictionRequest, + @PathParam("wid") wid: Integer, + @Auth sessionUser: SessionUser + ): Unit = { + validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid) + val logicalOpIds = Option(request.logicalOpIds).getOrElse(List.empty).map(_.trim).filter(_.nonEmpty) + if (logicalOpIds.isEmpty) { + return + } + val dao = new OperatorPortCacheDao(SqlServer.getInstance()) + val cacheService = new OperatorPortCacheService(dao) + cacheService.invalidateCacheForLogicalOperators(WorkflowIdentity(wid.toLong), logicalOpIds) + } + + /** + * Invalidates cache entries whose fingerprints do not match the provided logical plan. + * Intended to be called after workflow compilation during editing. + */ + @POST + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Path("/{wid}/cache/invalidate") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def invalidateWorkflowCacheEntries( + request: LogicalPlanPojo, + @PathParam("wid") wid: Integer, + @Auth sessionUser: SessionUser + ): Unit = { + validateUserCanWriteWorkflow(sessionUser.getUser.getUid, wid) + val workflow = try { + val workflowContext = new WorkflowContext(workflowId = WorkflowIdentity(wid.toLong)) + new WorkflowCompiler(workflowContext).compile(request) + } catch { + case err: Throwable => + throw new BadRequestException(s"Failed to compile workflow for cache invalidation: ${err.getMessage}") + } + val dao = new OperatorPortCacheDao(SqlServer.getInstance()) + val cacheService = new OperatorPortCacheService(dao) + cacheService.invalidateMismatchedCacheEntries(WorkflowIdentity(wid.toLong), workflow.physicalPlan) + } + /** * Shared handler for cache eviction endpoints. */ diff --git a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala index 82aef5f4c8..e85afef1c5 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala @@ -22,6 +22,7 @@ package org.apache.texera.web.service import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import org.apache.texera.amber.core.workflow.cache.FingerprintUtil import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity, PhysicalPlan} +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_EXECUTIONS @@ -30,6 +31,7 @@ import org.apache.texera.amber.core.storage.DocumentFactory import java.net.URI import scala.jdk.CollectionConverters._ +import scala.util.Try /** * Service for operator port result caching. @@ -40,6 +42,7 @@ import scala.jdk.CollectionConverters._ * - Cache entry creation when output ports complete * - Fingerprint computation and serialization * - Cache invalidation and lifecycle management + * - Manual eviction by logical operator and compile-time mismatch cleanup * * @param dao OperatorPortCacheDao for database access */ @@ -131,10 +134,59 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { */ def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = { val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0) - val resultUris = cacheEntries.map(_.resultUri).distinct - deleteOperatorPortResultsByUris(resultUris) - dao.deleteByWorkflow(workflowId.id) - clearCachedResultDocuments(resultUris) + deleteCacheEntriesByPorts(workflowId, cacheEntries) + } + + /** + * Invalidate cache entries for outputs owned by the provided logical operators. + * This is used by editor actions that evict cache for selected operators. + * + * @param workflowId Workflow ID whose cache entries should be deleted + * @param logicalOpIds Logical operator IDs whose output caches should be removed + * @return Number of cache entries invalidated + */ + def invalidateCacheForLogicalOperators( + workflowId: WorkflowIdentity, + logicalOpIds: Seq[String] + ): Int = { + val normalizedIds = logicalOpIds.map(_.trim).filter(_.nonEmpty).toSet + if (normalizedIds.isEmpty) { + return 0 + } + val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0) + val entriesToDelete = cacheEntries.filter { entry => + val logicalOpIdOpt = + Try(GlobalPortIdentitySerde.deserializeFromString(entry.globalPortId)) + .toOption + .map(_.opId.logicalOpId.id) + logicalOpIdOpt.exists(normalizedIds.contains) + } + deleteCacheEntriesByPorts(workflowId, entriesToDelete) + entriesToDelete.size + } + + /** + * Invalidate cache entries whose fingerprints no longer match the current plan. + * This supports compile-time invalidation after workflow edits. + * + * @param workflowId Workflow ID whose cache entries should be checked + * @param physicalPlan Latest compiled physical plan + * @return Number of cache entries invalidated + */ + def invalidateMismatchedCacheEntries( + workflowId: WorkflowIdentity, + physicalPlan: PhysicalPlan + ): Int = { + val cacheEntries = dao.listByWorkflow(workflowId.id, Int.MaxValue, 0) + val entriesToDelete = cacheEntries.filter { entry => + val fingerprintHashOpt = Try { + val portId = GlobalPortIdentitySerde.deserializeFromString(entry.globalPortId) + FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId).subdagHash + }.toOption + fingerprintHashOpt.forall(_ != entry.subdagHash) + } + deleteCacheEntriesByKeys(workflowId, entriesToDelete) + entriesToDelete.size } /** @@ -165,6 +217,41 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { } } + /** + * Deletes cache entries by port ID, and removes associated result artifacts. + */ + private def deleteCacheEntriesByPorts( + workflowId: WorkflowIdentity, + entries: Seq[OperatorPortCacheRecord] + ): Unit = { + if (entries.isEmpty) { + return + } + val resultUris = entries.map(_.resultUri).distinct + deleteOperatorPortResultsByUris(resultUris) + dao.deleteByGlobalPortIds(workflowId.id, entries.map(_.globalPortId).distinct) + clearCachedResultDocuments(resultUris) + } + + /** + * Deletes cache entries by (port ID, subDAG hash) pair, and removes associated result artifacts. + */ + private def deleteCacheEntriesByKeys( + workflowId: WorkflowIdentity, + entries: Seq[OperatorPortCacheRecord] + ): Unit = { + if (entries.isEmpty) { + return + } + val resultUris = entries.map(_.resultUri).distinct + deleteOperatorPortResultsByUris(resultUris) + dao.deleteByGlobalPortAndHashes( + workflowId.id, + entries.map(entry => (entry.globalPortId, entry.subdagHash)) + ) + clearCachedResultDocuments(resultUris) + } + /** * Future: Cost-aware eviction when storage quota is exceeded. * Phase 3: Lifecycle management research. diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index bde5981d9e..775d06e6e8 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -192,8 +192,17 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Cache panel toggle to show only entries usable by the current execution - Cache panel action to clear all cached results (deletes cache entries, result documents, and port result records) - Output port labels show tuple counts, plus a second line with source execution id for cached outputs +- Output ports show a cached indicator when any cache entry exists for that port +- Editor context menu can evict cache for the selected operator or its upstream operators - Result URI hidden from the UI +**Cache UX & invalidation (Implemented)**: + +- Output ports show a cached indicator when any cache entry exists for that port (no usable/not-usable distinction on the graph) +- Context menu actions: "Clear cache" (selected operator) and "Clear cache up to this operator" (includes disabled operators and the selected operator) +- Cache invalidation on compile: evict cache entries whose fingerprints no longer match the current workflow +- TODO: Use source execution runtime stats for cached operator input/output counts, with fallback to `operator_port_cache.tuple_count` when stats are missing + **Cache usage updates**: - `CacheUsageUpdateEvent` publishes cached outputs usable by the current execution (fingerprint match) @@ -282,6 +291,8 @@ HTTP endpoints for external access: - `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache entries (result_uri omitted) - `DELETE /executions/{workflowId}/cache`: Clear all cache entries and delete stored result documents - `POST /executions/{workflowId}/cache/clear`: POST alternative for environments that block DELETE +- `POST /executions/{workflowId}/cache/evict`: Evict cache entries for specified logical operator IDs +- `POST /executions/{workflowId}/cache/invalidate`: Remove cache entries whose fingerprints do not match the provided logical plan **Note**: Internal services use `OperatorPortCacheService`, not the REST resource. @@ -519,7 +530,14 @@ The cache system integrates with three layers: - [X] Re-emit cache usage snapshots on websocket connect to refresh cache labels after reload - [X] Keep result URI hidden in the UI -#### 1.4 Testing & Validation +#### 1.4 Cache UX & Invalidation ✓ COMPLETE + +- [X] Show cached indicator on output ports for any cache entry +- [X] Add context-menu actions to clear cache for selected operator and upstream operators (includes disabled) +- [X] Invalidate mismatched cache entries on compile (fingerprint comparison) +- [ ] TODO: Use source execution runtime stats for cached operator counts, with tuple-count fallback + +#### 1.5 Testing & Validation - [X] Verify downstream cached URI consumption across all operator types - [ ] Add integration tests: cache upsert → DB verification diff --git a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts index 615ffefb73..a5fe110d96 100644 --- a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts +++ b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts @@ -24,7 +24,7 @@ import { HttpClient, HttpParams } from "@angular/common/http"; import { WorkflowExecutionsEntry } from "../../../type/workflow-executions-entry"; import { WorkflowCacheEntry } from "../../../type/workflow-cache-entry"; import { WorkflowRuntimeStatistics } from "../../../type/workflow-runtime-statistics"; -import { ExecutionState } from "../../../../workspace/types/execute-workflow.interface"; +import { ExecutionState, LogicalPlan } from "../../../../workspace/types/execute-workflow.interface"; export const WORKFLOW_EXECUTIONS_API_BASE_URL = `${AppSettings.getApiEndpoint()}/executions`; @@ -114,4 +114,20 @@ export class WorkflowExecutionsService { deleteWorkflowCacheEntries(wid: number): Observable<void> { return this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/clear`, {}); } + + /** + * Evicts cache entries owned by the provided logical operators. + */ + evictWorkflowCacheEntries(wid: number, logicalOpIds: ReadonlyArray<string>): Observable<void> { + return this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/evict`, { + logicalOpIds, + }); + } + + /** + * Invalidates cache entries whose fingerprints do not match the provided logical plan. + */ + invalidateWorkflowCacheEntries(wid: number, logicalPlan: LogicalPlan): Observable<void> { + return this.http.post<void>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache/invalidate`, logicalPlan); + } } diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts index 7a2e50a426..967d9e298e 100644 --- a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -21,9 +21,9 @@ import { Component, OnInit } from "@angular/core"; import { ActivatedRoute } from "@angular/router"; import { finalize } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; -import { WorkflowExecutionsService } from "../../../../dashboard/service/user/workflow-executions/workflow-executions.service"; import { WorkflowCacheEntry } from "../../../../dashboard/type/workflow-cache-entry"; import { CacheUsageService } from "../../../service/workflow-status/cache-usage.service"; +import { WorkflowCacheEntriesService } from "../../../service/workflow-status/workflow-cache-entries.service"; /** * CachePanelComponent renders cache entry metadata for the current workflow. @@ -47,7 +47,7 @@ export class CachePanelComponent implements OnInit { private usageKeys = new Set<string>(); constructor( - private workflowExecutionsService: WorkflowExecutionsService, + private cacheEntriesService: WorkflowCacheEntriesService, private cacheUsageService: CacheUsageService, private route: ActivatedRoute ) {} @@ -58,6 +58,19 @@ export class CachePanelComponent implements OnInit { return; } this.workflowId = workflowId; + this.cacheEntriesService + .getCacheEntriesStream() + .pipe(untilDestroyed(this)) + .subscribe(entries => { + this.cacheEntries = [...entries]; + this.updateVisibleEntries(); + }); + this.cacheEntriesService + .getLoadingStream() + .pipe(untilDestroyed(this)) + .subscribe(loading => { + this.loading = loading; + }); this.refresh(); this.cacheUsageService .getCacheUsageStream() @@ -77,41 +90,26 @@ export class CachePanelComponent implements OnInit { if (!this.workflowId) { return; } - this.loading = true; - this.workflowExecutionsService - .retrieveWorkflowCacheEntries(this.workflowId) - .pipe( - finalize(() => { - this.loading = false; - }), - untilDestroyed(this) - ) - .subscribe(entries => { - this.cacheEntries = entries; - this.updateVisibleEntries(); - }); + this.cacheEntriesService.refreshCacheEntries(this.workflowId).pipe(untilDestroyed(this)).subscribe(); } /** - * Removes all cached outputs for the workflow and refreshes the list. + * Removes all cached outputs for the workflow and updates shared cache state. */ public clearCacheEntries(): void { if (!this.workflowId) { return; } this.removing = true; - this.workflowExecutionsService - .deleteWorkflowCacheEntries(this.workflowId) + this.cacheEntriesService + .clearWorkflowCacheEntries(this.workflowId) .pipe( finalize(() => { this.removing = false; }), untilDestroyed(this) ) - .subscribe(() => { - this.cacheEntries = []; - this.updateVisibleEntries(); - }); + .subscribe(); } /** diff --git a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html index 7a6fcba6f2..c55c352321 100644 --- a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html +++ b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.html @@ -123,6 +123,32 @@ nzTheme="twotone"></span >remove reusing result </li> + <li + nz-menu-item + *ngIf="operatorMenuService.highlightedOperators.value.length === 1 && + operatorMenuService.highlightedCommentBoxes.value.length === 0 && + !hasHighlightedLinks() && + isWorkflowModifiable" + (click)="clearCacheForSelectedOperator()"> + <span + nz-icon + nzType="database" + nzTheme="outline"></span + >clear cache + </li> + <li + nz-menu-item + *ngIf="operatorMenuService.highlightedOperators.value.length === 1 && + operatorMenuService.highlightedCommentBoxes.value.length === 0 && + !hasHighlightedLinks() && + isWorkflowModifiable" + (click)="clearCacheUpToSelectedOperator()"> + <span + nz-icon + nzType="database" + nzTheme="twotone"></span + >clear cache up to this operator + </li> <li nz-menu-item *ngIf="(operatorMenuService.highlightedOperators.value.length > 0 || diff --git a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts index 2807066b76..70e9236bc1 100644 --- a/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/context-menu/context-menu/context-menu.component.ts @@ -27,6 +27,8 @@ import { NzModalService } from "ng-zorro-antd/modal"; import { ResultExportationComponent } from "../../../result-exportation/result-exportation.component"; import { ValidationWorkflowService } from "src/app/workspace/service/validation/validation-workflow.service"; import { GuiConfigService } from "../../../../../common/service/gui-config.service"; +import { WorkflowExecutionsService } from "src/app/dashboard/service/user/workflow-executions/workflow-executions.service"; +import { WorkflowCacheEntriesService } from "src/app/workspace/service/workflow-status/workflow-cache-entries.service"; @UntilDestroy() @Component({ @@ -44,7 +46,9 @@ export class ContextMenuComponent { protected config: GuiConfigService, private workflowResultService: WorkflowResultService, private modalService: NzModalService, - private validationWorkflowService: ValidationWorkflowService + private validationWorkflowService: ValidationWorkflowService, + private workflowExecutionsService: WorkflowExecutionsService, + private cacheEntriesService: WorkflowCacheEntriesService ) { this.registerWorkflowModifiableChangedHandler(); } @@ -144,4 +148,72 @@ export class ContextMenuComponent { nzFooter: null, }); } + + /** + * Clears cached outputs produced by the selected operator. + */ + public clearCacheForSelectedOperator(): void { + const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid; + if (!workflowId || !this.hasExactlyOneOperatorSelected()) { + return; + } + const operatorId = this.getSelectedOperatorID(); + this.workflowExecutionsService + .evictWorkflowCacheEntries(workflowId, [operatorId]) + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + }); + } + + /** + * Clears cached outputs produced by the selected operator and its upstream operators. + */ + public clearCacheUpToSelectedOperator(): void { + const workflowId = this.workflowActionService.getWorkflowMetadata()?.wid; + if (!workflowId || !this.hasExactlyOneOperatorSelected()) { + return; + } + const operatorId = this.getSelectedOperatorID(); + const upstreamOperatorIds = this.collectUpstreamOperatorIds(operatorId); + this.workflowExecutionsService + .evictWorkflowCacheEntries(workflowId, upstreamOperatorIds) + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + }); + } + + /** + * Returns the selected operator and all upstream operator IDs (includes disabled operators). + */ + private collectUpstreamOperatorIds(operatorId: string): string[] { + const links = this.workflowActionService.getTexeraGraph().getAllLinks(); + const incoming = new Map<string, string[]>(); + links.forEach(link => { + const sourceId = link.source.operatorID; + const targetId = link.target.operatorID; + if (!incoming.has(targetId)) { + incoming.set(targetId, []); + } + incoming.get(targetId)!.push(sourceId); + }); + + const visited = new Set<string>(); + const queue: string[] = [operatorId]; + while (queue.length > 0) { + const current = queue.shift(); + if (!current || visited.has(current)) { + continue; + } + visited.add(current); + const upstream = incoming.get(current) ?? []; + upstream.forEach(upstreamId => { + if (!visited.has(upstreamId)) { + queue.push(upstreamId); + } + }); + } + return Array.from(visited); + } } diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index 45b6857327..cc0126df69 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -28,9 +28,11 @@ import { fromJointPaperEvent, JointUIService, linkPathStrokeColor } from "../../ import { ValidationWorkflowService } from "../../service/validation/validation-workflow.service"; import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service"; import { CacheUsageService } from "../../service/workflow-status/cache-usage.service"; +import { WorkflowCacheEntriesService } from "../../service/workflow-status/workflow-cache-entries.service"; import { WorkflowStatusService } from "../../service/workflow-status/workflow-status.service"; import { ExecutionState, OperatorState } from "../../types/execute-workflow.interface"; import { LogicalPort, OperatorLink } from "../../types/workflow-common.interface"; +import { WorkflowCacheEntry } from "../../../dashboard/type/workflow-cache-entry"; import { auditTime, filter, map, takeUntil } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { UndoRedoService } from "../../service/undo-redo/undo-redo.service"; @@ -95,6 +97,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy private currentOpenedOperatorID: string | null = null; private removeButton!: new () => joint.linkTools.Button; private breakpointButton!: new () => joint.linkTools.Button; + private cachedEntries: ReadonlyArray<WorkflowCacheEntry> = []; constructor( private workflowActionService: WorkflowActionService, @@ -104,6 +107,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy private jointUIService: JointUIService, private workflowStatusService: WorkflowStatusService, private cacheUsageService: CacheUsageService, + private cacheEntriesService: WorkflowCacheEntriesService, private executeWorkflowService: ExecuteWorkflowService, private nzModalService: NzModalService, private changeDetectorRef: ChangeDetectorRef, @@ -166,6 +170,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.registerPortDisplayNameChangeHandler(); this.handleOperatorStatisticsUpdate(); this.handleCacheUsageUpdate(); + this.handleCacheEntriesUpdate(); this.handleRegionEvents(); this.handleOperatorSuggestionHighlightEvent(); this.handleElementDelete(); @@ -358,6 +363,52 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy }); } + /** + * Updates cached output port indicators whenever cache entries or graph structure changes. + */ + private handleCacheEntriesUpdate(): void { + this.cacheEntriesService + .getCacheEntriesStream() + .pipe(untilDestroyed(this)) + .subscribe(entries => { + this.cachedEntries = entries; + this.applyCachedPortIndicators(); + }); + + merge( + this.workflowActionService.getTexeraGraph().getOperatorAddStream(), + this.workflowActionService.getTexeraGraph().getOperatorDeleteStream(), + this.workflowActionService.getTexeraGraph().getPortAddedOrDeletedStream() + ) + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.applyCachedPortIndicators(); + }); + } + + /** + * Applies cached port indicators based on the latest cache entry snapshot. + */ + private applyCachedPortIndicators(): void { + const cachedPortsByOperator = new Map<string, Set<string>>(); + this.cachedEntries + .filter(entry => !entry.internal) + .forEach(entry => { + if (!cachedPortsByOperator.has(entry.logicalOpId)) { + cachedPortsByOperator.set(entry.logicalOpId, new Set<string>()); + } + cachedPortsByOperator.get(entry.logicalOpId)!.add(entry.portId.toString()); + }); + + this.workflowActionService + .getTexeraGraph() + .getAllOperators() + .forEach(op => { + const cachedPorts = cachedPortsByOperator.get(op.operatorID) ?? new Set<string>(); + this.jointUIService.changeOperatorCachedPorts(this.paper, op.operatorID, cachedPorts); + }); + } + private handleRegionEvents(): void { const Region = joint.dia.Element.define( "region", diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts index 9648d2b305..b2e3477926 100644 --- a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts @@ -25,7 +25,7 @@ import { AppSettings } from "../../../common/app-setting"; import { areOperatorSchemasEqual, OperatorSchema } from "../../types/operator-schema.interface"; import { ExecuteWorkflowService } from "../execute-workflow/execute-workflow.service"; import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; -import { catchError, debounceTime, mergeMap } from "rxjs/operators"; +import { catchError, debounceTime, map, mergeMap } from "rxjs/operators"; import { DynamicSchemaService } from "../dynamic-schema/dynamic-schema.service"; import { AttributeType, @@ -42,6 +42,8 @@ import { WorkflowGraphReadonly } from "../workflow-graph/model/workflow-graph"; import { serializePortIdentity } from "../../../common/util/port-identity-serde"; import { addCompilationError, areAllPortSchemasEqual } from "../../../common/util/workflow-compilation-utils"; import { parseLogicalOperatorPortID } from "../../../common/util/logical-operator-port-serde"; +import { WorkflowExecutionsService } from "../../../dashboard/service/user/workflow-executions/workflow-executions.service"; +import { WorkflowCacheEntriesService } from "../workflow-status/workflow-cache-entries.service"; // endpoint for workflow compile export const WORKFLOW_COMPILATION_ENDPOINT = "compile"; @@ -49,10 +51,11 @@ export const WORKFLOW_COMPILATION_ENDPOINT = "compile"; export const WORKFLOW_COMPILATION_DEBOUNCE_TIME_MS = 500; /** - * Workflow Compiling Service provides mainly 3 functionalities: + * Workflow Compiling Service provides mainly 4 functionalities: * 1. autocomplete attribute property of operators (previously done by the SchemaPropagationService) * 2. receive static errors (previously done by sending EditingTimeCompilationRequest and saving in the ExecutionStateInfo) * 3. manage PhysicalPlan (TODO: send the physical plan to the standalone WorkflowExecutingService once we have it) + * 4. invalidate mismatched cache entries after successful compilation * * When user creates and connects operators in workflow, the WorkflowCompilingService's api will be triggered, which, * propagate the schemas, compiles the user's workflow to get the physical plan and static errors(if any). @@ -73,7 +76,9 @@ export class WorkflowCompilingService { private httpClient: HttpClient, private workflowActionService: WorkflowActionService, private dynamicSchemaService: DynamicSchemaService, - private validationWorkflowService: ValidationWorkflowService + private validationWorkflowService: ValidationWorkflowService, + private workflowExecutionsService: WorkflowExecutionsService, + private cacheEntriesService: WorkflowCacheEntriesService ) { // Subscribe to compilation state changes to apply schema propagation this.compilationStateInfoChangedStream.subscribe(() => { @@ -98,10 +103,10 @@ export class WorkflowCompilingService { this.validationWorkflowService.getValidTexeraGraph(), undefined ); - return this.compile(logicalPlan); + return this.compile(logicalPlan).pipe(map(response => ({ response, logicalPlan }))); }) ) - .subscribe(response => { + .subscribe(({ response, logicalPlan }) => { if (response.physicalPlan) { this.currentCompilationStateInfo = { state: CompilationState.Succeeded, @@ -116,6 +121,7 @@ export class WorkflowCompilingService { }; } this.compilationStateInfoChangedStream.next(this.currentCompilationStateInfo.state); + this.invalidateMismatchedCacheEntries(logicalPlan); }); } @@ -294,6 +300,7 @@ export class WorkflowCompilingService { opsToReuseResult: [], opsToViewResult: [], }; + console.log(body); // make a http post request to the API endpoint with the logical plan object return this.httpClient .post<WorkflowCompilationResponse>( @@ -313,6 +320,32 @@ export class WorkflowCompilingService { ); } + /** + * Triggers cache invalidation after a successful compilation. + * Cache entries with mismatched fingerprints are removed on the backend. + */ + private invalidateMismatchedCacheEntries(logicalPlan: LogicalPlan): void { + const workflowId = this.workflowActionService.getWorkflowMetadata().wid; + if ( + workflowId === undefined || + workflowId <= 0 || + this.currentCompilationStateInfo.state !== CompilationState.Succeeded + ) { + return; + } + this.workflowExecutionsService + .invalidateWorkflowCacheEntries(workflowId, logicalPlan) + .pipe( + catchError(err => { + console.warn("cache invalidation failed during compilation", err); + return EMPTY; + }) + ) + .subscribe(() => { + this.cacheEntriesService.refreshCacheEntries(workflowId).subscribe(); + }); + } + public static setOperatorInputAttrs( operatorSchema: OperatorSchema, inputPortSchemaMap: OperatorPortSchemaMap | undefined diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index c174e63958..7f098a3e10 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -425,6 +425,30 @@ export class JointUIService { } }); } + + /** + * Updates cached output port indicator rings without changing counts or labels. + */ + public changeOperatorCachedPorts( + jointPaper: joint.dia.Paper, + operatorID: string, + cachedPortIds?: Set<string> + ): void { + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; + if (!element) { + return; + } + const outPorts = element.getPorts().filter(p => p.group === "out"); + outPorts.forEach(portDef => { + const portId = portDef.id; + if (portId != null) { + const parts = portId.split("-"); + const numericSuffix = parts.length > 1 ? parts[1] : portId; + const isCached = cachedPortIds?.has(numericSuffix) ?? false; + element.portProp(portId, "attrs/.port-cache-indicator/display", isCached ? "block" : "none"); + } + }); + } public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void { jointPaper.getModelById(operatorID).attr({ [`.${operatorStateClass}`]: { visibility: "hidden" }, @@ -679,6 +703,13 @@ export class JointUIService { r: 5, stroke: "none", }, + ".port-cache-indicator": { + fill: "none", + r: 7, + stroke: "#1890ff", + "stroke-width": 1.5, + display: "none", + }, ".port-label": { visibility: "visible", event: "input-label:evt", @@ -696,6 +727,13 @@ export class JointUIService { */ public static getCustomPortMarkup(): any[] { return [ + { + tagName: "circle", + selector: ".port-cache-indicator", + attributes: { + class: "port-cache-indicator", + }, + }, { tagName: "circle", selector: ".port-body", diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts new file mode 100644 index 0000000000..b2f5ca0597 --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/workflow-cache-entries.service.ts @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable } from "rxjs"; +import { finalize, tap } from "rxjs/operators"; +import { WorkflowExecutionsService } from "../../../dashboard/service/user/workflow-executions/workflow-executions.service"; +import { WorkflowCacheEntry } from "../../../dashboard/type/workflow-cache-entry"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; + +/** + * Shares workflow cache entry state across components and keeps it in sync with the backend. + */ +@Injectable({ + providedIn: "root", +}) +export class WorkflowCacheEntriesService { + private readonly cacheEntriesSubject = new BehaviorSubject<ReadonlyArray<WorkflowCacheEntry>>([]); + private readonly loadingSubject = new BehaviorSubject<boolean>(false); + private currentWorkflowId?: number; + + constructor( + private workflowExecutionsService: WorkflowExecutionsService, + private workflowActionService: WorkflowActionService + ) { + this.workflowActionService.workflowMetaDataChanged().subscribe(metadata => { + const workflowId = metadata.wid ?? 0; + if (workflowId <= 0) { + this.currentWorkflowId = undefined; + this.cacheEntriesSubject.next([]); + return; + } + if (workflowId !== this.currentWorkflowId) { + this.currentWorkflowId = workflowId; + this.refreshCacheEntries(workflowId).subscribe(); + } + }); + + const initialMetadata = this.workflowActionService.getWorkflowMetadata(); + const initialWorkflowId = initialMetadata?.wid ?? 0; + if (initialWorkflowId > 0 && initialWorkflowId !== this.currentWorkflowId) { + this.currentWorkflowId = initialWorkflowId; + this.refreshCacheEntries(initialWorkflowId).subscribe(); + } + } + + /** + * Returns a stream of cache entries for the active workflow. + */ + public getCacheEntriesStream(): Observable<ReadonlyArray<WorkflowCacheEntry>> { + return this.cacheEntriesSubject.asObservable(); + } + + /** + * Returns a stream of cache entry loading state. + */ + public getLoadingStream(): Observable<boolean> { + return this.loadingSubject.asObservable(); + } + + /** + * Returns the latest cache entry snapshot. + */ + public getCacheEntriesSnapshot(): ReadonlyArray<WorkflowCacheEntry> { + return this.cacheEntriesSubject.value; + } + + /** + * Refreshes cache entries for a workflow and updates shared state. + * + * @param workflowId Workflow ID to refresh entries for + */ + public refreshCacheEntries(workflowId: number): Observable<WorkflowCacheEntry[]> { + if (workflowId > 0 && workflowId !== this.currentWorkflowId) { + this.currentWorkflowId = workflowId; + } + this.loadingSubject.next(true); + return this.workflowExecutionsService.retrieveWorkflowCacheEntries(workflowId).pipe( + tap(entries => this.cacheEntriesSubject.next(entries)), + finalize(() => this.loadingSubject.next(false)) + ); + } + + /** + * Clears all cached outputs for a workflow and updates shared state. + * + * @param workflowId Workflow ID whose cache entries should be removed + */ + public clearWorkflowCacheEntries(workflowId: number): Observable<void> { + return this.workflowExecutionsService.deleteWorkflowCacheEntries(workflowId).pipe( + tap(() => this.cacheEntriesSubject.next([])) + ); + } + + /** + * Returns cached output port IDs for a logical operator. + * + * @param operatorId Logical operator ID + */ + public getCachedOutputPorts(operatorId: string): Set<string> { + const cachedPorts = new Set<string>(); + this.cacheEntriesSubject.value + .filter(entry => entry.logicalOpId === operatorId && !entry.internal) + .forEach(entry => cachedPorts.add(entry.portId.toString())); + return cachedPorts; + } +}
