This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 84433a3825dd3928534ac4b5a343c323d772ae8a Author: Xiaozhen Liu <[email protected]> AuthorDate: Wed Jan 14 19:00:45 2026 -0800 feat(cache): display cache information on the frontend. --- .../texera/web/dao/OperatorPortCacheDao.scala | 59 ++++++++++- .../websocket/event/CacheUsageUpdateEvent.scala | 48 +++++++++ .../websocket/event/TexeraWebSocketEvent.scala | 1 + .../user/workflow/WorkflowExecutionsResource.scala | 67 ++++++++++++ .../web/service/WorkflowExecutionService.scala | 60 +++++++++-- .../texera/web/service/WorkflowService.scala | 18 +++- .../web/storage/ExecutionCacheUsageStore.scala | 34 ++++++ .../texera/web/storage/ExecutionStateStore.scala | 13 ++- docs/operator-port-cache.md | 46 +++++++-- frontend/src/app/app.module.ts | 2 + .../workflow-executions.service.ts | 16 +++ .../src/app/dashboard/type/workflow-cache-entry.ts | 33 ++++++ .../cache-panel/cache-panel.component.html | 85 +++++++++++++++ .../cache-panel/cache-panel.component.scss | 37 +++++++ .../cache-panel/cache-panel.component.ts | 114 +++++++++++++++++++++ .../component/left-panel/left-panel.component.ts | 9 +- .../workflow-editor/workflow-editor.component.ts | 27 ++++- .../workspace/service/joint-ui/joint-ui.service.ts | 107 +++++++++++++++++-- .../service/workflow-status/cache-usage.service.ts | 81 +++++++++++++++ .../types/workflow-websocket.interface.ts | 17 +++ 20 files changed, 846 insertions(+), 28 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala index f6bf30efb2..c1af475b3a 100644 --- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala +++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala @@ -24,6 +24,8 @@ import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE import org.jooq.DSLContext import java.net.URI +import java.time.OffsetDateTime +import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ /** @@ -36,6 +38,8 @@ import scala.jdk.OptionConverters._ * @param resultUri URI of the materialized output * @param tupleCount Number of tuples in the cached output (optional) * @param sourceExecutionId Execution ID that produced this cache entry (optional) + * @param updatedAt Last update timestamp for this cache entry (optional; DB-managed). + * Uses OffsetDateTime to align with Jooq's TIMESTAMPTZ mapping. * * Note: updated_at timestamp is managed by the database (DEFAULT now()) */ @@ -46,7 +50,8 @@ case class OperatorPortCacheRecord( fingerprintJson: String, resultUri: URI, tupleCount: Option[Long], - sourceExecutionId: Option[Long] + sourceExecutionId: Option[Long], + updatedAt: Option[OffsetDateTime] = None ) /** @@ -79,7 +84,8 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { OPERATOR_PORT_CACHE.FINGERPRINT_JSON, OPERATOR_PORT_CACHE.RESULT_URI, OPERATOR_PORT_CACHE.TUPLE_COUNT, - OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, + OPERATOR_PORT_CACHE.UPDATED_AT ) .from(OPERATOR_PORT_CACHE) .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) @@ -95,11 +101,58 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { fingerprintJson = record.value4(), resultUri = URI.create(record.value5()), tupleCount = Option(record.value6()).map(_.longValue()), - sourceExecutionId = Option(record.value7()).map(_.longValue()) + sourceExecutionId = Option(record.value7()).map(_.longValue()), + updatedAt = Option(record.value8()) ) } } + /** + * List cache entries for a workflow, ordered by most recent update. + * + * @param workflowId Workflow ID to list cache entries for + * @param limit Max number of entries to return + * @param offset Offset into the result set for pagination + * @return Cache entries ordered by updated_at descending + */ + def listByWorkflow( + workflowId: Long, + limit: Int, + offset: Int + ): Seq[OperatorPortCacheRecord] = { + context + .select( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH, + OPERATOR_PORT_CACHE.FINGERPRINT_JSON, + OPERATOR_PORT_CACHE.RESULT_URI, + OPERATOR_PORT_CACHE.TUPLE_COUNT, + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, + OPERATOR_PORT_CACHE.UPDATED_AT + ) + .from(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .orderBy(OPERATOR_PORT_CACHE.UPDATED_AT.desc()) + .limit(limit) + .offset(offset) + .fetch() + .asScala + .map(record => + OperatorPortCacheRecord( + workflowId = record.value1().longValue(), + globalPortId = record.value2(), + subdagHash = record.value3(), + fingerprintJson = record.value4(), + resultUri = URI.create(record.value5()), + tupleCount = Option(record.value6()).map(_.longValue()), + sourceExecutionId = Option(record.value7()).map(_.longValue()), + updatedAt = Option(record.value8()) + ) + ) + .toSeq + } + /** * Insert or update a cache entry (upsert). * On conflict (workflow_id, global_port_id, subdag_hash), updates the existing record. diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala new file mode 100644 index 0000000000..153d98248f --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.model.websocket.event + +/** + * Cache usage metadata for a single output port matched during submission-time lookup. + * + * @param globalPortId Serialized GlobalPortIdentity string + * @param logicalOpId Logical operator id owning the port + * @param layerName Physical operator layer name + * @param portId Output port id + * @param internal Whether the port is internal + * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint + * @param tupleCount Cached tuple count (optional) + * @param sourceExecutionId Execution id that produced the cached output (optional) + */ +case class CachedPortUsage( + globalPortId: String, + logicalOpId: String, + layerName: String, + portId: Int, + internal: Boolean, + subdagHash: String, + tupleCount: Option[Long], + sourceExecutionId: Option[Long] +) + +/** + * Websocket event that surfaces cache metadata matched for the current execution. + */ +case class CacheUsageUpdateEvent(cachedOutputs: List[CachedPortUsage]) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala index da072c80ea..a4953c3e1e 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala @@ -35,6 +35,7 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify new Type(value = classOf[WebResultUpdateEvent]), new Type(value = classOf[ConsoleUpdateEvent]), new Type(value = classOf[CacheStatusUpdateEvent]), + new Type(value = classOf[CacheUsageUpdateEvent]), new Type(value = classOf[PaginatedResultEvent]), new Type(value = classOf[PythonExpressionEvaluateResponse]), new Type(value = classOf[WorkerAssignmentUpdateEvent]), diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index f0b661e524..52d3f43123 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -33,6 +33,7 @@ import org.apache.texera.amber.engine.architecture.logreplay.{ReplayDestination, import org.apache.texera.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState} import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.auth.{JwtParser, SessionUser} import org.apache.texera.dao.SqlServer @@ -41,6 +42,7 @@ import org.apache.texera.dao.jooq.generated.Tables._ import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao import org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, User => UserPojo} +import org.apache.texera.web.dao.OperatorPortCacheDao import org.apache.texera.web.model.http.request.result.ResultExportRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import org.apache.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService} @@ -554,6 +556,23 @@ object WorkflowExecutionsResource { numWorkers: Int, status: Int ) + + /** + * Cache entry metadata returned for a workflow. + * + * result_uri is intentionally omitted from the payload. + */ + case class WorkflowCacheEntry( + globalPortId: String, + logicalOpId: String, + layerName: String, + portId: Int, + internal: Boolean, + subdagHash: String, + tupleCount: Option[Long], + sourceExecutionId: Option[Long], + updatedAt: Timestamp + ) } case class ExecutionGroupBookmarkRequest( @@ -748,6 +767,54 @@ class WorkflowExecutionsResource { .toList } + /** + * Returns cache entries for a workflow, ordered by most recent update. + * + * @param wid workflow ID + * @param sessionUser authenticated user + * @param limit max number of entries to return (optional; defaults to all) + * @param offset pagination offset (optional) + */ + @GET + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/{wid}/cache") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def retrieveWorkflowCacheEntries( + @PathParam("wid") wid: Integer, + @Auth sessionUser: SessionUser, + @QueryParam("limit") limit: Integer, + @QueryParam("offset") offset: Integer + ): List[WorkflowCacheEntry] = { + validateUserCanAccessWorkflow(sessionUser.getUser.getUid, wid) + + val effectiveLimit = + Option(limit).map(_.toInt).filter(_ > 0).getOrElse(Int.MaxValue) + val effectiveOffset = + Option(offset).map(_.toInt).filter(_ >= 0).getOrElse(0) + + val dao = new OperatorPortCacheDao(SqlServer.getInstance()) + dao + .listByWorkflow(wid.toLong, effectiveLimit, effectiveOffset) + .map { record => + val globalPortId = + GlobalPortIdentitySerde.deserializeFromString(record.globalPortId) + WorkflowCacheEntry( + globalPortId = record.globalPortId, + logicalOpId = globalPortId.opId.logicalOpId.id, + layerName = globalPortId.opId.layerName, + portId = globalPortId.portId.id, + internal = globalPortId.portId.internal, + subdagHash = record.subdagHash, + tupleCount = record.tupleCount, + sourceExecutionId = record.sourceExecutionId, + updatedAt = record.updatedAt + .map(odt => Timestamp.from(odt.toInstant)) + .getOrElse(new Timestamp(0L)) + ) + } + .toList + } + /** Sets a group of executions' bookmarks to the payload passed in the body. */ @PUT @Consumes(Array(MediaType.APPLICATION_JSON)) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 7777aaf7ab..4e0e841645 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -29,15 +29,18 @@ import org.apache.texera.amber.engine.common.Utils import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity} +import org.apache.texera.amber.core.workflow.cache.FingerprintUtil import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.web.model.websocket.event.{ + CacheUsageUpdateEvent, + CachedPortUsage, TexeraWebSocketEvent, WorkflowErrorEvent, WorkflowStateEvent } import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource -import org.apache.texera.web.storage.ExecutionStateStore +import org.apache.texera.web.storage.{ExecutionCacheUsageStore, ExecutionStateStore} import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState import org.apache.texera.web.{ComputingUnitMaster, SubscriptionManager, WebsocketInput} import org.apache.texera.workflow.WorkflowCompiler @@ -87,6 +90,11 @@ class WorkflowExecutionService( outputEvents }) ) + addSubscription( + executionStateStore.cacheUsageStore.registerDiffHandler((_, newState) => { + Iterable(CacheUsageUpdateEvent(newState.cachedOutputs)) + }) + ) private def createStateEvent(state: ExecutionMetadataStore): WorkflowStateEvent = { if (state.isRecovering && state.state != COMPLETED) { @@ -107,21 +115,61 @@ class WorkflowExecutionService( var executionConsoleService: ExecutionConsoleService = _ var executionCacheService: ExecutionCacheService = _ + /** + * Lookup cached outputs for the physical plan and return them keyed by GlobalPortIdentity. + * + * This is used both for workflow settings (serialized key map) and for cache + * metadata updates sent to the UI. + */ private def computeCachedOutputs( physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan - ): Map[String, CachedOutput] = { - cacheService - .lookupCachedOutputs(workflowContext.workflowId, physicalPlan) - .map { case (gpid, cached) => gpid.serializeAsString -> cached } + ): Map[GlobalPortIdentity, CachedOutput] = { + cacheService.lookupCachedOutputs(workflowContext.workflowId, physicalPlan) + } + + /** + * Build cache usage metadata for the current execution from matched cached outputs. + */ + private def buildCacheUsageEntries( + physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan, + cachedOutputs: Map[GlobalPortIdentity, CachedOutput] + ): List[CachedPortUsage] = { + cachedOutputs.toList + .map { + case (gpid, cached) => + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) + CachedPortUsage( + globalPortId = gpid.serializeAsString, + logicalOpId = gpid.opId.logicalOpId.id, + layerName = gpid.opId.layerName, + portId = gpid.portId.id, + internal = gpid.portId.internal, + subdagHash = fingerprint.subdagHash, + tupleCount = cached.tupleCount, + sourceExecutionId = cached.sourceExecutionId.map(_.id) + ) + } + .sortBy(entry => (entry.logicalOpId, entry.layerName, entry.portId)) } + /** + * Compiles the workflow, prepares cache metadata, initializes execution services, and starts execution. + */ def executeWorkflow(): Unit = { try { workflow = new WorkflowCompiler(workflowContext) .compile(request.logicalPlan) - val cachedOutputs = computeCachedOutputs(workflow.physicalPlan) + val cachedOutputsByPort = computeCachedOutputs(workflow.physicalPlan) + val cachedOutputs = cachedOutputsByPort.map { case (gpid, cached) => + gpid.serializeAsString -> cached + } workflowContext.workflowSettings = workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs) + val cacheUsageEntries = + buildCacheUsageEntries(workflow.physicalPlan, cachedOutputsByPort) + executionStateStore.cacheUsageStore.updateState(_ => + ExecutionCacheUsageStore(cacheUsageEntries) + ) } catch { case err: Throwable => errorHandler(err) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 1756ff8032..2b5abd8592 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{ } import org.apache.texera.dao.jooq.generated.tables.pojos.User import org.apache.texera.service.util.LargeBinaryManager -import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent +import org.apache.texera.web.model.websocket.event.{CacheUsageUpdateEvent, TexeraWebSocketEvent} import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId @@ -160,6 +160,10 @@ class WorkflowService( new CompositeDisposable(subscriptions :+ errorSubscription: _*) } + /** + * Subscribes to execution-scoped websocket events and emits cache usage snapshots + * so refreshed sessions can rehydrate cached output labels. + */ def connectToExecution(onNext: TexeraWebSocketEvent => Unit): Disposable = { val localDisposable = new CompositeDisposable() val disposable = executionService.subscribe { execService: WorkflowExecutionService => @@ -171,11 +175,23 @@ class WorkflowService( ) .toSeq localDisposable.addAll(subscriptions: _*) + emitCacheUsageSnapshot(execService, onNext) } // Note: this new CompositeDisposable is necessary. DO NOT OPTIMIZE. new CompositeDisposable(localDisposable, disposable) } + /** + * Sends the latest cache usage metadata for the current execution to a new subscriber. + */ + private def emitCacheUsageSnapshot( + execService: WorkflowExecutionService, + onNext: TexeraWebSocketEvent => Unit + ): Unit = { + val cachedOutputs = execService.executionStateStore.cacheUsageStore.getState.cachedOutputs + onNext(CacheUsageUpdateEvent(cachedOutputs)) + } + def disconnect(): Unit = { lifeCycleManager.decreaseUserCount( Option(executionService.getValue).map(_.executionStateStore.metadataStore.getState.state) diff --git a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala new file mode 100644 index 0000000000..28d61810c7 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.storage + +import org.apache.texera.web.model.websocket.event.CachedPortUsage + +/** + * Holds cached output entries that matched the current execution's workflow fingerprint. + * + * The store is used to emit websocket updates so the frontend can render cache metadata + * (e.g., source execution IDs) without leaking result URIs. + * The updatedAt field forces a state change even when the cache list is empty. + */ +case class ExecutionCacheUsageStore( + cachedOutputs: List[CachedPortUsage] = List.empty, + updatedAt: Long = System.currentTimeMillis() +) diff --git a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala index 654acbbefd..d307c8c021 100644 --- a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala +++ b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala @@ -55,8 +55,19 @@ class ExecutionStateStore { val consoleStore = new StateStore(ExecutionConsoleStore()) val breakpointStore = new StateStore(ExecutionBreakpointStore()) val reconfigurationStore = new StateStore(ExecutionReconfigurationStore()) + val cacheUsageStore = new StateStore(ExecutionCacheUsageStore()) + /** + * Returns all state stores that should publish websocket updates for an execution. + */ def getAllStores: Iterable[StateStore[_]] = { - Iterable(statsStore, consoleStore, breakpointStore, metadataStore, reconfigurationStore) + Iterable( + statsStore, + consoleStore, + breakpointStore, + metadataStore, + reconfigurationStore, + cacheUsageStore + ) } } diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 0ffaa2f826..7f627755e5 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -158,6 +158,17 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - No `from_cache` flag required (can infer from numWorkers=0 + instant completion) - Lifecycle management (eviction, cleanup) deferred to future work +**Cache metadata UI (Implemented)**: +- Left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) +- Highlight cache entries matched for the current execution fingerprint +- Output port labels show tuple counts, plus a second line with source execution id for cached outputs +- Result URI hidden from the UI + +**Cache usage updates**: +- `CacheUsageUpdateEvent` publishes matched cached outputs at submission time +- Frontend uses the event to drive cache entry highlighting and per-port cache labels +- Cache usage snapshots are re-emitted on websocket connect to keep labels visible after refresh + ### 5. Service & DAO Architecture #### OperatorPortCacheDao @@ -177,6 +188,13 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { /** Upsert cache entry (insert or update on conflict) */ def upsert(record: OperatorPortCacheRecord): Unit + /** List cache entries for a workflow (ordered by updated_at desc) */ + def listByWorkflow( + workflowId: Long, + limit: Int, + offset: Int + ): Seq[OperatorPortCacheRecord] + /** Delete all cache entries for a workflow */ def deleteByWorkflow(workflowId: Long): Unit } @@ -222,9 +240,9 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { #### WorkflowExecutionsResource (REST API - Optional) **Location**: `/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala` -HTTP endpoints for external access (if needed): -- `GET /cache/{workflowId}/{portId}/{hash}`: Manual cache lookup -- `DELETE /cache/{workflowId}`: Manual cache invalidation +HTTP endpoints for external access: +- `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache entries (result_uri omitted) +- (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation **Note**: Internal services use `OperatorPortCacheService`, not the REST resource. @@ -234,7 +252,7 @@ Phase 1.1 Service/DAO architecture is complete. Key components: | Component | Location | Purpose | |-----------|----------|---------| -| **OperatorPortCacheDao** | `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | Low-level database access using Jooq. Methods: `get()`, `upsert()`, `deleteByWorkflow()` | +| **OperatorPortCacheDao** | `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | Low-level database access using Jooq. Methods: `get()`, `upsert()`, `listByWorkflow()`, `deleteByWorkflow()` | | **OperatorPortCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` | High-level cache operations. Methods: `lookupCachedOutputs()`, `upsertCachedOutput()`, `invalidateWorkflowCache()` | | **ExecutionCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` | Event listener that registers callback for `PortMaterialized` events and bridges to service layer | | **PortMaterialized Event** | `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` | Client event emitted when output port completes with URI and tuple count | @@ -282,7 +300,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), `fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, `updated_at` - Timestamp managed by database (`DEFAULT now()`) - **Service/DAO architecture** (Phase 1.1 - Complete): - - `OperatorPortCacheDao`: Low-level database access with get/upsert/delete methods + - `OperatorPortCacheDao`: Low-level database access with get/upsert/listByWorkflow/delete methods - `OperatorPortCacheService`: High-level cache operations (lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache) - `ExecutionCacheService`: Event listener bridging controller events to service layer - Event-based communication via `PortMaterialized` event and `client.registerCallback[T]` @@ -305,6 +323,13 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - Worker count label: cached operators show `from cache` instead of `#workers` - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached regions in `workflow-editor.component.ts` - Region visibility: shared state via `WorkflowActionService.showRegion` ensures correct visibility when regions are created during execution +- **Cache metadata UI** (Phase 1.3 - Complete): + - `CacheUsageUpdateEvent` publishes matched cached outputs for the current execution + - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) + - Cache entries highlight when they match the current workflow fingerprint + - Cached output ports show source execution id on a second label line + - REST: `GET /executions/{wid}/cache` lists cache entries (result URI omitted) + - Result URI omitted from UI payloads ### Architecture Integration The cache system integrates with three layers: @@ -401,7 +426,7 @@ The cache system integrates with three layers: - `cacheService` created at workflow level - `executionCacheService` created per execution - [ ] Add unit tests for DAO operations -- [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that delegate to service +- [x] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET /executions/{wid}/cache`) #### 1.2 Frontend Cache Visualization ✓ COMPLETE - [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum @@ -426,7 +451,14 @@ The cache system integrates with three layers: - [x] Fix region visibility with shared state via `WorkflowActionService.showRegion` - Ensures regions show correctly when user toggles visibility before execution -#### 1.3 Testing & Validation +#### 1.3 Cache Metadata UI ✓ COMPLETE +- [x] Add left panel "Cache" tab listing workflow cache entries (physical op id, port id, tuple count, source execution id, updated_at, short subdag hash) +- [x] Highlight cache entries matched for the current execution fingerprint +- [x] Show per-output-port sourceExecutionId on a second output port label line +- [x] Re-emit cache usage snapshots on websocket connect to refresh cache labels after reload +- [x] Keep result URI hidden in the UI + +#### 1.4 Testing & Validation - [ ] Verify downstream cached URI consumption across all operator types - [ ] Add integration tests: cache upsert → DB verification - [ ] Add E2E tests: run → cache → rerun → verify skip diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index 73feecfdba..52e7ebf605 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -84,6 +84,7 @@ import { RowModalComponent } from "./workspace/component/result-panel/result-pan import { OperatorPropertyEditFrameComponent } from "./workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component"; import { NzTabsModule } from "ng-zorro-antd/tabs"; import { VersionsListComponent } from "./workspace/component/left-panel/versions-list/versions-list.component"; +import { CachePanelComponent } from "./workspace/component/left-panel/cache-panel/cache-panel.component"; import { NzPaginationModule } from "ng-zorro-antd/pagination"; import { JwtModule } from "@auth0/angular-jwt"; import { AuthService } from "./common/service/user/auth.service"; @@ -196,6 +197,7 @@ registerLocaleData(en); SettingsComponent, PropertyEditorComponent, VersionsListComponent, + CachePanelComponent, TimeTravelComponent, WorkflowEditorComponent, ResultPanelComponent, diff --git a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts index b15f668bff..7da51bb2b5 100644 --- a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts +++ b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts @@ -22,6 +22,7 @@ import { Observable } from "rxjs"; import { AppSettings } from "../../../../common/app-setting"; import { HttpClient, HttpParams } from "@angular/common/http"; import { WorkflowExecutionsEntry } from "../../../type/workflow-executions-entry"; +import { WorkflowCacheEntry } from "../../../type/workflow-cache-entry"; import { WorkflowRuntimeStatistics } from "../../../type/workflow-runtime-statistics"; import { ExecutionState } from "../../../../workspace/types/execute-workflow.interface"; @@ -91,4 +92,19 @@ export class WorkflowExecutionsService { params, }); } + + /** + * Retrieves cache entries for the workflow, ordered by most recent update. + * Limit and offset are optional; omit to fetch all entries. + */ + retrieveWorkflowCacheEntries(wid: number, limit?: number, offset?: number): Observable<WorkflowCacheEntry[]> { + let params = new HttpParams(); + if (limit !== undefined) { + params = params.set("limit", limit.toString()); + } + if (offset !== undefined) { + params = params.set("offset", offset.toString()); + } + return this.http.get<WorkflowCacheEntry[]>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache`, { params }); + } } diff --git a/frontend/src/app/dashboard/type/workflow-cache-entry.ts b/frontend/src/app/dashboard/type/workflow-cache-entry.ts new file mode 100644 index 0000000000..ae17ecf077 --- /dev/null +++ b/frontend/src/app/dashboard/type/workflow-cache-entry.ts @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Cache entry metadata returned for a workflow. + */ +export interface WorkflowCacheEntry { + globalPortId: string; + logicalOpId: string; + layerName: string; + portId: number; + internal: boolean; + subdagHash: string; + tupleCount?: number; + sourceExecutionId?: number; + updatedAt: number; +} diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html new file mode 100644 index 0000000000..39783defb3 --- /dev/null +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html @@ -0,0 +1,85 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<div class="cache-panel"> + <div class="cache-panel__actions"> + <button + nz-button + nzSize="small" + nzType="default" + [disabled]="loading" + (click)="refresh()"> + Refresh + </button> + <span + class="cache-panel__count" + *ngIf="cacheEntries.length"> + {{ cacheEntries.length }} entries + </span> + </div> + + <nz-table + nzSize="small" + [nzSimple]="true" + [nzFrontPagination]="false" + nzTableLayout="auto" + [nzLoading]="loading" + *ngIf="loading || cacheEntries.length" + [nzData]="cacheEntries"> + <thead> + <tr> + <th>Port</th> + <th>Cache Info</th> + </tr> + </thead> + <tbody> + <tr + *ngFor="let entry of cacheEntries" + [class.cache-match]="isMatched(entry)"> + <td> + <div class="cache-port"> + <div class="cache-port__op">{{ entry.logicalOpId }}</div> + <div class="cache-port__layer">{{ entry.layerName }}</div> + <div class="cache-port__id"> + port {{ entry.portId }}<span *ngIf="entry.internal"> (internal)</span> + </div> + </div> + </td> + <td> + <div class="cache-meta"> + <nz-tag + *ngIf="isMatched(entry)" + nzColor="blue"> + Matched + </nz-tag> + <div>source execution: {{ formatSourceExecutionId(entry.sourceExecutionId) }}</div> + <div>tuples: {{ formatTupleCount(entry.tupleCount) }}</div> + <div>updated: {{ entry.updatedAt ? (entry.updatedAt | date:'short') : '-' }}</div> + <div>hash: {{ shortenSubdagHash(entry.subdagHash) }}</div> + </div> + </td> + </tr> + </tbody> + </nz-table> + + <nz-empty + *ngIf="!loading && !cacheEntries.length" + nzNotFoundContent="No cache entries"> + </nz-empty> +</div> diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss new file mode 100644 index 0000000000..402fac73b2 --- /dev/null +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss @@ -0,0 +1,37 @@ +.cache-panel { + display: flex; + flex-direction: column; + gap: 8px; +} + +.cache-panel__actions { + display: flex; + align-items: center; + gap: 8px; +} + +.cache-panel__count { + color: #666; + font-size: 12px; +} + +.cache-port__op { + font-weight: 600; +} + +.cache-port__layer, +.cache-port__id { + color: #666; + font-size: 12px; +} + +.cache-meta { + display: flex; + flex-direction: column; + gap: 2px; + font-size: 12px; +} + +tr.cache-match { + background-color: rgba(24, 144, 255, 0.08); +} diff --git a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts new file mode 100644 index 0000000000..33ca2e7463 --- /dev/null +++ b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, OnInit } from "@angular/core"; +import { ActivatedRoute } from "@angular/router"; +import { finalize } from "rxjs/operators"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { WorkflowExecutionsService } from "../../../../dashboard/service/user/workflow-executions/workflow-executions.service"; +import { WorkflowCacheEntry } from "../../../../dashboard/type/workflow-cache-entry"; +import { CacheUsageService } from "../../../service/workflow-status/cache-usage.service"; + +/** + * CachePanelComponent renders cache entry metadata for the current workflow. + */ +@UntilDestroy() +@Component({ + selector: "texera-cache-panel", + templateUrl: "cache-panel.component.html", + styleUrls: ["cache-panel.component.scss"], +}) +export class CachePanelComponent implements OnInit { + public cacheEntries: WorkflowCacheEntry[] = []; + public loading = false; + private workflowId?: number; + private usageKeys = new Set<string>(); + + constructor( + private workflowExecutionsService: WorkflowExecutionsService, + private cacheUsageService: CacheUsageService, + private route: ActivatedRoute + ) {} + + ngOnInit(): void { + const workflowId = Number(this.route.snapshot.params.id); + if (!workflowId) { + return; + } + this.workflowId = workflowId; + this.refresh(); + this.cacheUsageService + .getCacheUsageStream() + .pipe(untilDestroyed(this)) + .subscribe(entries => { + this.usageKeys = new Set( + entries.map(entry => this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)) + ); + }); + } + + /** + * Refreshes the cache entry list from the backend. + */ + public refresh(): void { + if (!this.workflowId) { + return; + } + this.loading = true; + this.workflowExecutionsService + .retrieveWorkflowCacheEntries(this.workflowId) + .pipe( + finalize(() => { + this.loading = false; + }), + untilDestroyed(this) + ) + .subscribe(entries => { + this.cacheEntries = entries; + }); + } + + /** + * Returns true when a cache entry matches the current execution fingerprint. + */ + public isMatched(entry: WorkflowCacheEntry): boolean { + return this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash)); + } + + /** + * Formats tuple counts for display. + */ + public formatTupleCount(tupleCount?: number): string { + return tupleCount === undefined ? "-" : tupleCount.toString(); + } + + /** + * Formats source execution IDs for display. + */ + public formatSourceExecutionId(sourceExecutionId?: number): string { + return sourceExecutionId === undefined ? "-" : sourceExecutionId.toString(); + } + + /** + * Shortens subDAG hash for compact display. + */ + public shortenSubdagHash(hash: string): string { + return hash.length > 8 ? hash.slice(0, 8) : hash; + } +} diff --git a/frontend/src/app/workspace/component/left-panel/left-panel.component.ts b/frontend/src/app/workspace/component/left-panel/left-panel.component.ts index 8dd9a5748b..3a6db75604 100644 --- a/frontend/src/app/workspace/component/left-panel/left-panel.component.ts +++ b/frontend/src/app/workspace/component/left-panel/left-panel.component.ts @@ -23,6 +23,7 @@ import { NzResizeEvent } from "ng-zorro-antd/resizable"; import { CdkDragDrop, moveItemInArray } from "@angular/cdk/drag-drop"; import { OperatorMenuComponent } from "./operator-menu/operator-menu.component"; import { VersionsListComponent } from "./versions-list/versions-list.component"; +import { CachePanelComponent } from "./cache-panel/cache-panel.component"; import { WorkflowExecutionHistoryComponent } from "../../../dashboard/component/user/user-workflow/ngbd-modal-workflow-executions/workflow-execution-history.component"; import { TimeTravelComponent } from "./time-travel/time-travel.component"; import { SettingsComponent } from "./settings/settings.component"; @@ -51,6 +52,7 @@ export class LeftPanelComponent implements OnDestroy, OnInit, AfterViewInit { { component: null, title: "", icon: "", enabled: true }, { component: OperatorMenuComponent, title: "Operators", icon: "appstore", enabled: true }, { component: VersionsListComponent, title: "Versions", icon: "schedule", enabled: true }, + { component: CachePanelComponent, title: "Cache", icon: "database", enabled: true }, { component: SettingsComponent, title: "Settings", @@ -93,9 +95,12 @@ export class LeftPanelComponent implements OnDestroy, OnInit, AfterViewInit { this.height = Number(localStorage.getItem("left-panel-height")) || this.height; } + /** + * Applies feature flags to left panel items that are conditionally enabled. + */ private updateItemsWithConfig(): void { - this.items[4].enabled = this.config.env.workflowExecutionsTrackingEnabled; // Execution History - this.items[5].enabled = this.config.env.timetravelEnabled; // Time Travel + this.items[5].enabled = this.config.env.workflowExecutionsTrackingEnabled; // Execution History + this.items[6].enabled = this.config.env.timetravelEnabled; // Time Travel } ngOnInit(): void { diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index 2d43f80a61..45b6857327 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -27,6 +27,7 @@ import { ExecuteWorkflowService } from "../../service/execute-workflow/execute-w import { fromJointPaperEvent, JointUIService, linkPathStrokeColor } from "../../service/joint-ui/joint-ui.service"; import { ValidationWorkflowService } from "../../service/validation/validation-workflow.service"; import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service"; +import { CacheUsageService } from "../../service/workflow-status/cache-usage.service"; import { WorkflowStatusService } from "../../service/workflow-status/workflow-status.service"; import { ExecutionState, OperatorState } from "../../types/execute-workflow.interface"; import { LogicalPort, OperatorLink } from "../../types/workflow-common.interface"; @@ -102,6 +103,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy private validationWorkflowService: ValidationWorkflowService, private jointUIService: JointUIService, private workflowStatusService: WorkflowStatusService, + private cacheUsageService: CacheUsageService, private executeWorkflowService: ExecuteWorkflowService, private nzModalService: NzModalService, private changeDetectorRef: ChangeDetectorRef, @@ -163,6 +165,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.handlePortHighlightEvent(); this.registerPortDisplayNameChangeHandler(); this.handleOperatorStatisticsUpdate(); + this.handleCacheUsageUpdate(); this.handleRegionEvents(); this.handleOperatorSuggestionHighlightEvent(); this.handleElementDelete(); @@ -303,7 +306,8 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy op.operatorID, status[op.operatorID], this.isSource(op.operatorID), - this.isSink(op.operatorID) + this.isSink(op.operatorID), + this.cacheUsageService.getPortCacheLabels(op.operatorID) ); }); }); @@ -333,6 +337,27 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy }); } + /** + * Updates cached output port labels whenever cache usage metadata changes. + */ + private handleCacheUsageUpdate(): void { + this.cacheUsageService + .getCacheUsageStream() + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.workflowActionService + .getTexeraGraph() + .getAllOperators() + .forEach(op => { + this.jointUIService.changeOperatorCacheLabels( + this.paper, + op.operatorID, + this.cacheUsageService.getPortCacheLabels(op.operatorID) + ); + }); + }); + } + private handleRegionEvents(): void { const Region = joint.dia.Element.define( "region", diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index d89624a2c8..69095ff3e0 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -236,6 +236,7 @@ export class JointUIService { width: JointUIService.DEFAULT_OPERATOR_WIDTH, height: JointUIService.DEFAULT_OPERATOR_HEIGHT, }, + portLabelMarkup: JointUIService.getCustomPortLabelMarkup(), attrs: JointUIService.getCustomOperatorStyleAttrs( operator, operator.customDisplayName ?? operatorSchema.additionalMetadata.userFriendlyName, @@ -244,8 +245,26 @@ export class JointUIService { ), ports: { groups: { - in: { attrs: JointUIService.getCustomPortStyleAttrs() }, - out: { attrs: JointUIService.getCustomPortStyleAttrs() }, + in: { + attrs: JointUIService.getCustomPortStyleAttrs(), + markup: JointUIService.getCustomPortMarkup(), + label: { + position: { + name: "left", + args: { x: -5, y: 10 }, + }, + }, + }, + out: { + attrs: JointUIService.getCustomPortStyleAttrs(), + markup: JointUIService.getCustomPortMarkup(), + label: { + position: { + name: "right", + args: { x: 5, y: -10 }, + }, + }, + }, }, }, markup: TexeraCustomJointElement.getMarkup( @@ -299,12 +318,17 @@ export class JointUIService { return operatorElement; } + /** + * Updates operator state, worker labels, and per-port counts using latest statistics. + * Cache labels are applied for cached outputs and positioned to avoid overlapping the outgoing edge. + */ public changeOperatorStatistics( jointPaper: joint.dia.Paper, operatorID: string, statistics: OperatorStatistics | undefined, isSource: boolean, - isSink: boolean + isSink: boolean, + cachePortLabels?: Record<string, string> ): void { if (!statistics) { this.changeOperatorState(jointPaper, operatorID, OperatorState.Uninitialized); @@ -362,14 +386,43 @@ export class JointUIService { originalName = portId; } - const labelText = - isSkippedFromCache && count === undefined ? "-" : String(count ?? 0); - - element.portProp(portId, "attrs/.port-label/text", labelText); + const baseLabel = isSkippedFromCache && count === undefined ? "-" : String(count ?? 0); + element.portProp(portId, "attrs/.port-label/text", baseLabel); } }); + const effectiveCacheLabels = isSkippedFromCache ? cachePortLabels : undefined; + this.changeOperatorCacheLabels(jointPaper, operatorID, effectiveCacheLabels); this.changeOperatorState(jointPaper, operatorID, statistics.operatorState); } + + /** + * Updates cache usage labels for output ports without changing counts or operator state. + */ + public changeOperatorCacheLabels( + jointPaper: joint.dia.Paper, + operatorID: string, + cachePortLabels?: Record<string, string> + ): void { + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; + if (!element) { + return; + } + const outPorts = element.getPorts().filter(p => p.group === "out"); + outPorts.forEach(portDef => { + const portId = portDef.id; + if (portId != null) { + const parts = portId.split("-"); + const numericSuffix = parts.length > 1 ? parts[1] : portId; + const cacheLabel = cachePortLabels?.[numericSuffix] ?? ""; + element.portProp(portId, "attrs/.port-cache-label/text", cacheLabel); + element.portProp( + portId, + "attrs/.port-cache-label/transform", + cacheLabel ? "translate(0, 12)" : "" + ); + } + }); + } public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void { jointPaper.getModelById(operatorID).attr({ [`.${operatorStateClass}`]: { visibility: "hidden" }, @@ -435,6 +488,7 @@ export class JointUIService { inPorts.forEach(p => { if (p.id != null) { element.portProp(p.id, "attrs/.port-label/fill", fillColor); + element.portProp(p.id, "attrs/.port-cache-label/fill", fillColor); } }); @@ -442,6 +496,7 @@ export class JointUIService { outPorts.forEach(p => { if (p.id != null) { element.portProp(p.id, "attrs/.port-label/fill", fillColor); + element.portProp(p.id, "attrs/.port-cache-label/fill", fillColor); } }); } @@ -611,6 +666,7 @@ export class JointUIService { /** * This function changes the default svg of the operator ports. * It hides the port label that will display 'out/in' beside the operators. + * Port labels remain visible for per-port metrics and cache metadata. * * @returns the custom attributes of the ports */ @@ -633,6 +689,43 @@ export class JointUIService { }; } + /** + * Defines the default markup for ports. + */ + public static getCustomPortMarkup(): any[] { + return [ + { + tagName: "circle", + selector: ".port-body", + attributes: { + class: "port-body", + }, + }, + ]; + } + + /** + * Defines the default port label markup for counts and cache metadata. + */ + public static getCustomPortLabelMarkup(): any[] { + return [ + { + tagName: "text", + selector: ".port-label", + attributes: { + class: "port-label", + }, + }, + { + tagName: "text", + selector: ".port-cache-label", + attributes: { + class: "port-cache-label", + }, + }, + ]; + } + /** * This function create a custom svg style for the operator * @returns the custom attributes of the tooltip. diff --git a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts new file mode 100644 index 0000000000..59f10e8d54 --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { BehaviorSubject, Observable } from "rxjs"; +import { CachedPortUsage } from "../../types/workflow-websocket.interface"; +import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; + +/** + * Stores cache usage metadata emitted for the current execution and exposes helpers + * for rendering per-port cache labels and cache entry highlights. + */ +@Injectable({ + providedIn: "root", +}) +export class CacheUsageService { + private readonly cacheUsageSubject = new BehaviorSubject<ReadonlyArray<CachedPortUsage>>([]); + + constructor(private workflowWebsocketService: WorkflowWebsocketService) { + this.registerCacheUsageListener(); + } + + /** + * Returns a stream of cached output metadata matched for the current execution. + */ + public getCacheUsageStream(): Observable<ReadonlyArray<CachedPortUsage>> { + return this.cacheUsageSubject.asObservable(); + } + + /** + * Returns the latest cached output metadata snapshot. + */ + public getCacheUsageSnapshot(): ReadonlyArray<CachedPortUsage> { + return this.cacheUsageSubject.value; + } + + /** + * Builds label text per output port for a logical operator, keyed by port id. + * Each label is a single line (e.g., "src 42") that is rendered below the port count. + */ + public getPortCacheLabels(operatorId: string): Record<string, string> { + const labels: Record<string, string> = {}; + this.cacheUsageSubject.value + .filter(entry => entry.logicalOpId === operatorId) + .forEach(entry => { + const portKey = entry.portId.toString(); + const executionId = entry.sourceExecutionId ?? "unknown"; + labels[portKey] = `src ${executionId}`; + }); + return labels; + } + + /** + * Builds a stable key to match cache entries against cache usage updates. + */ + public buildUsageKey(globalPortId: string, subdagHash: string): string { + return `${globalPortId}|${subdagHash}`; + } + + private registerCacheUsageListener(): void { + this.workflowWebsocketService.subscribeToEvent("CacheUsageUpdateEvent").subscribe(event => { + this.cacheUsageSubject.next(event.cachedOutputs); + }); + } +} diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index afd5ea6f04..e22633f213 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -141,6 +141,22 @@ export interface CacheStatusUpdateEvent cacheStatusMap: Record<string, OperatorResultCacheStatus>; }> {} +export type CachedPortUsage = Readonly<{ + globalPortId: string; + logicalOpId: string; + layerName: string; + portId: number; + internal: boolean; + subdagHash: string; + tupleCount?: number; + sourceExecutionId?: number; +}>; + +export interface CacheUsageUpdateEvent + extends Readonly<{ + cachedOutputs: ReadonlyArray<CachedPortUsage>; + }> {} + export type PythonExpressionEvaluateRequest = Readonly<{ expression: string; operatorId: string; @@ -235,6 +251,7 @@ export type TexeraWebsocketEventTypeMap = { ResultExportResponse: ResultExportResponse; WorkflowAvailableResultEvent: WorkflowAvailableResultEvent; CacheStatusUpdateEvent: CacheStatusUpdateEvent; + CacheUsageUpdateEvent: CacheUsageUpdateEvent; PythonExpressionEvaluateResponse: PythonExpressionEvaluateResponse; WorkerAssignmentUpdateEvent: WorkerAssignmentUpdateEvent; ModifyLogicResponse: ModifyLogicResponse;
