This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 84433a3825dd3928534ac4b5a343c323d772ae8a
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Jan 14 19:00:45 2026 -0800

    feat(cache): display cache information on the frontend.
---
 .../texera/web/dao/OperatorPortCacheDao.scala      |  59 ++++++++++-
 .../websocket/event/CacheUsageUpdateEvent.scala    |  48 +++++++++
 .../websocket/event/TexeraWebSocketEvent.scala     |   1 +
 .../user/workflow/WorkflowExecutionsResource.scala |  67 ++++++++++++
 .../web/service/WorkflowExecutionService.scala     |  60 +++++++++--
 .../texera/web/service/WorkflowService.scala       |  18 +++-
 .../web/storage/ExecutionCacheUsageStore.scala     |  34 ++++++
 .../texera/web/storage/ExecutionStateStore.scala   |  13 ++-
 docs/operator-port-cache.md                        |  46 +++++++--
 frontend/src/app/app.module.ts                     |   2 +
 .../workflow-executions.service.ts                 |  16 +++
 .../src/app/dashboard/type/workflow-cache-entry.ts |  33 ++++++
 .../cache-panel/cache-panel.component.html         |  85 +++++++++++++++
 .../cache-panel/cache-panel.component.scss         |  37 +++++++
 .../cache-panel/cache-panel.component.ts           | 114 +++++++++++++++++++++
 .../component/left-panel/left-panel.component.ts   |   9 +-
 .../workflow-editor/workflow-editor.component.ts   |  27 ++++-
 .../workspace/service/joint-ui/joint-ui.service.ts | 107 +++++++++++++++++--
 .../service/workflow-status/cache-usage.service.ts |  81 +++++++++++++++
 .../types/workflow-websocket.interface.ts          |  17 +++
 20 files changed, 846 insertions(+), 28 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala 
b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
index f6bf30efb2..c1af475b3a 100644
--- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
+++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
@@ -24,6 +24,8 @@ import 
org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE
 import org.jooq.DSLContext
 
 import java.net.URI
+import java.time.OffsetDateTime
+import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 
 /**
@@ -36,6 +38,8 @@ import scala.jdk.OptionConverters._
   * @param resultUri URI of the materialized output
   * @param tupleCount Number of tuples in the cached output (optional)
   * @param sourceExecutionId Execution ID that produced this cache entry 
(optional)
+  * @param updatedAt Last update timestamp for this cache entry (optional; 
DB-managed).
+  *                  Uses OffsetDateTime to align with Jooq's TIMESTAMPTZ 
mapping.
   *
   * Note: updated_at timestamp is managed by the database (DEFAULT now())
   */
@@ -46,7 +50,8 @@ case class OperatorPortCacheRecord(
     fingerprintJson: String,
     resultUri: URI,
     tupleCount: Option[Long],
-    sourceExecutionId: Option[Long]
+    sourceExecutionId: Option[Long],
+    updatedAt: Option[OffsetDateTime] = None
 )
 
 /**
@@ -79,7 +84,8 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
         OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
         OPERATOR_PORT_CACHE.RESULT_URI,
         OPERATOR_PORT_CACHE.TUPLE_COUNT,
-        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID,
+        OPERATOR_PORT_CACHE.UPDATED_AT
       )
       .from(OPERATOR_PORT_CACHE)
       .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
@@ -95,11 +101,58 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
           fingerprintJson = record.value4(),
           resultUri = URI.create(record.value5()),
           tupleCount = Option(record.value6()).map(_.longValue()),
-          sourceExecutionId = Option(record.value7()).map(_.longValue())
+          sourceExecutionId = Option(record.value7()).map(_.longValue()),
+          updatedAt = Option(record.value8())
         )
       }
   }
 
+  /**
+    * List cache entries for a workflow, ordered by most recent update.
+    *
+    * @param workflowId Workflow ID to list cache entries for
+    * @param limit Max number of entries to return
+    * @param offset Offset into the result set for pagination
+    * @return Cache entries ordered by updated_at descending
+    */
+  def listByWorkflow(
+      workflowId: Long,
+      limit: Int,
+      offset: Int
+  ): Seq[OperatorPortCacheRecord] = {
+    context
+      .select(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH,
+        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
+        OPERATOR_PORT_CACHE.RESULT_URI,
+        OPERATOR_PORT_CACHE.TUPLE_COUNT,
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID,
+        OPERATOR_PORT_CACHE.UPDATED_AT
+      )
+      .from(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .orderBy(OPERATOR_PORT_CACHE.UPDATED_AT.desc())
+      .limit(limit)
+      .offset(offset)
+      .fetch()
+      .asScala
+      .map(record =>
+        OperatorPortCacheRecord(
+          workflowId = record.value1().longValue(),
+          globalPortId = record.value2(),
+          subdagHash = record.value3(),
+          fingerprintJson = record.value4(),
+          resultUri = URI.create(record.value5()),
+          tupleCount = Option(record.value6()).map(_.longValue()),
+          sourceExecutionId = Option(record.value7()).map(_.longValue()),
+          updatedAt = Option(record.value8())
+        )
+      )
+      .toSeq
+  }
+
   /**
     * Insert or update a cache entry (upsert).
     * On conflict (workflow_id, global_port_id, subdag_hash), updates the 
existing record.
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala
new file mode 100644
index 0000000000..153d98248f
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/CacheUsageUpdateEvent.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.model.websocket.event
+
+/**
+  * Cache usage metadata for a single output port matched during 
submission-time lookup.
+  *
+  * @param globalPortId Serialized GlobalPortIdentity string
+  * @param logicalOpId Logical operator id owning the port
+  * @param layerName Physical operator layer name
+  * @param portId Output port id
+  * @param internal Whether the port is internal
+  * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint
+  * @param tupleCount Cached tuple count (optional)
+  * @param sourceExecutionId Execution id that produced the cached output 
(optional)
+  */
+case class CachedPortUsage(
+    globalPortId: String,
+    logicalOpId: String,
+    layerName: String,
+    portId: Int,
+    internal: Boolean,
+    subdagHash: String,
+    tupleCount: Option[Long],
+    sourceExecutionId: Option[Long]
+)
+
+/**
+  * Websocket event that surfaces cache metadata matched for the current 
execution.
+  */
+case class CacheUsageUpdateEvent(cachedOutputs: List[CachedPortUsage]) extends 
TexeraWebSocketEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
index da072c80ea..a4953c3e1e 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
@@ -35,6 +35,7 @@ import 
org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify
     new Type(value = classOf[WebResultUpdateEvent]),
     new Type(value = classOf[ConsoleUpdateEvent]),
     new Type(value = classOf[CacheStatusUpdateEvent]),
+    new Type(value = classOf[CacheUsageUpdateEvent]),
     new Type(value = classOf[PaginatedResultEvent]),
     new Type(value = classOf[PythonExpressionEvaluateResponse]),
     new Type(value = classOf[WorkerAssignmentUpdateEvent]),
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index f0b661e524..52d3f43123 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -33,6 +33,7 @@ import 
org.apache.texera.amber.engine.architecture.logreplay.{ReplayDestination,
 import org.apache.texera.amber.engine.common.Utils.{maptoStatusCode, 
stringToAggregatedState}
 import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
 import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.auth.{JwtParser, SessionUser}
 import org.apache.texera.dao.SqlServer
@@ -41,6 +42,7 @@ import org.apache.texera.dao.jooq.generated.Tables._
 import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
 import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
 import org.apache.texera.dao.jooq.generated.tables.pojos.{WorkflowExecutions, 
User => UserPojo}
+import org.apache.texera.web.dao.OperatorPortCacheDao
 import org.apache.texera.web.model.http.request.result.ResultExportRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
 import org.apache.texera.web.service.{ExecutionsMetadataPersistService, 
ResultExportService}
@@ -554,6 +556,23 @@ object WorkflowExecutionsResource {
       numWorkers: Int,
       status: Int
   )
+
+  /**
+    * Cache entry metadata returned for a workflow.
+    *
+    * result_uri is intentionally omitted from the payload.
+    */
+  case class WorkflowCacheEntry(
+      globalPortId: String,
+      logicalOpId: String,
+      layerName: String,
+      portId: Int,
+      internal: Boolean,
+      subdagHash: String,
+      tupleCount: Option[Long],
+      sourceExecutionId: Option[Long],
+      updatedAt: Timestamp
+  )
 }
 
 case class ExecutionGroupBookmarkRequest(
@@ -748,6 +767,54 @@ class WorkflowExecutionsResource {
       .toList
   }
 
+  /**
+    * Returns cache entries for a workflow, ordered by most recent update.
+    *
+    * @param wid workflow ID
+    * @param sessionUser authenticated user
+    * @param limit max number of entries to return (optional; defaults to all)
+    * @param offset pagination offset (optional)
+    */
+  @GET
+  @Produces(Array(MediaType.APPLICATION_JSON))
+  @Path("/{wid}/cache")
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  def retrieveWorkflowCacheEntries(
+      @PathParam("wid") wid: Integer,
+      @Auth sessionUser: SessionUser,
+      @QueryParam("limit") limit: Integer,
+      @QueryParam("offset") offset: Integer
+  ): List[WorkflowCacheEntry] = {
+    validateUserCanAccessWorkflow(sessionUser.getUser.getUid, wid)
+
+    val effectiveLimit =
+      Option(limit).map(_.toInt).filter(_ > 0).getOrElse(Int.MaxValue)
+    val effectiveOffset =
+      Option(offset).map(_.toInt).filter(_ >= 0).getOrElse(0)
+
+    val dao = new OperatorPortCacheDao(SqlServer.getInstance())
+    dao
+      .listByWorkflow(wid.toLong, effectiveLimit, effectiveOffset)
+      .map { record =>
+        val globalPortId =
+          GlobalPortIdentitySerde.deserializeFromString(record.globalPortId)
+        WorkflowCacheEntry(
+          globalPortId = record.globalPortId,
+          logicalOpId = globalPortId.opId.logicalOpId.id,
+          layerName = globalPortId.opId.layerName,
+          portId = globalPortId.portId.id,
+          internal = globalPortId.portId.internal,
+          subdagHash = record.subdagHash,
+          tupleCount = record.tupleCount,
+          sourceExecutionId = record.sourceExecutionId,
+          updatedAt = record.updatedAt
+            .map(odt => Timestamp.from(odt.toInstant))
+            .getOrElse(new Timestamp(0L))
+        )
+      }
+      .toList
+  }
+
   /** Sets a group of executions' bookmarks to the payload passed in the body. 
*/
   @PUT
   @Consumes(Array(MediaType.APPLICATION_JSON))
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index 7777aaf7ab..4e0e841645 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -29,15 +29,18 @@ import org.apache.texera.amber.engine.common.Utils
 import org.apache.texera.amber.engine.common.client.AmberClient
 import 
org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore
 import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity}
+import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.web.model.websocket.event.{
+  CacheUsageUpdateEvent,
+  CachedPortUsage,
   TexeraWebSocketEvent,
   WorkflowErrorEvent,
   WorkflowStateEvent
 }
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
-import org.apache.texera.web.storage.ExecutionStateStore
+import org.apache.texera.web.storage.{ExecutionCacheUsageStore, 
ExecutionStateStore}
 import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState
 import org.apache.texera.web.{ComputingUnitMaster, SubscriptionManager, 
WebsocketInput}
 import org.apache.texera.workflow.WorkflowCompiler
@@ -87,6 +90,11 @@ class WorkflowExecutionService(
       outputEvents
     })
   )
+  addSubscription(
+    executionStateStore.cacheUsageStore.registerDiffHandler((_, newState) => {
+      Iterable(CacheUsageUpdateEvent(newState.cachedOutputs))
+    })
+  )
 
   private def createStateEvent(state: ExecutionMetadataStore): 
WorkflowStateEvent = {
     if (state.isRecovering && state.state != COMPLETED) {
@@ -107,21 +115,61 @@ class WorkflowExecutionService(
   var executionConsoleService: ExecutionConsoleService = _
   var executionCacheService: ExecutionCacheService = _
 
+  /**
+    * Lookup cached outputs for the physical plan and return them keyed by 
GlobalPortIdentity.
+    *
+    * This is used both for workflow settings (serialized key map) and for 
cache
+    * metadata updates sent to the UI.
+    */
   private def computeCachedOutputs(
       physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan
-  ): Map[String, CachedOutput] = {
-    cacheService
-      .lookupCachedOutputs(workflowContext.workflowId, physicalPlan)
-      .map { case (gpid, cached) => gpid.serializeAsString -> cached }
+  ): Map[GlobalPortIdentity, CachedOutput] = {
+    cacheService.lookupCachedOutputs(workflowContext.workflowId, physicalPlan)
+  }
+
+  /**
+    * Build cache usage metadata for the current execution from matched cached 
outputs.
+    */
+  private def buildCacheUsageEntries(
+      physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan,
+      cachedOutputs: Map[GlobalPortIdentity, CachedOutput]
+  ): List[CachedPortUsage] = {
+    cachedOutputs.toList
+      .map {
+        case (gpid, cached) =>
+          val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
+          CachedPortUsage(
+            globalPortId = gpid.serializeAsString,
+            logicalOpId = gpid.opId.logicalOpId.id,
+            layerName = gpid.opId.layerName,
+            portId = gpid.portId.id,
+            internal = gpid.portId.internal,
+            subdagHash = fingerprint.subdagHash,
+            tupleCount = cached.tupleCount,
+            sourceExecutionId = cached.sourceExecutionId.map(_.id)
+          )
+      }
+      .sortBy(entry => (entry.logicalOpId, entry.layerName, entry.portId))
   }
 
+  /**
+    * Compiles the workflow, prepares cache metadata, initializes execution 
services, and starts execution.
+    */
   def executeWorkflow(): Unit = {
     try {
       workflow = new WorkflowCompiler(workflowContext)
         .compile(request.logicalPlan)
-      val cachedOutputs = computeCachedOutputs(workflow.physicalPlan)
+      val cachedOutputsByPort = computeCachedOutputs(workflow.physicalPlan)
+      val cachedOutputs = cachedOutputsByPort.map { case (gpid, cached) =>
+        gpid.serializeAsString -> cached
+      }
       workflowContext.workflowSettings =
         workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs)
+      val cacheUsageEntries =
+        buildCacheUsageEntries(workflow.physicalPlan, cachedOutputsByPort)
+      executionStateStore.cacheUsageStore.updateState(_ =>
+        ExecutionCacheUsageStore(cacheUsageEntries)
+      )
     } catch {
       case err: Throwable =>
         errorHandler(err)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 1756ff8032..2b5abd8592 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
 }
 import org.apache.texera.dao.jooq.generated.tables.pojos.User
 import org.apache.texera.service.util.LargeBinaryManager
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
+import org.apache.texera.web.model.websocket.event.{CacheUsageUpdateEvent, 
TexeraWebSocketEvent}
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId
@@ -160,6 +160,10 @@ class WorkflowService(
     new CompositeDisposable(subscriptions :+ errorSubscription: _*)
   }
 
+  /**
+    * Subscribes to execution-scoped websocket events and emits cache usage 
snapshots
+    * so refreshed sessions can rehydrate cached output labels.
+    */
   def connectToExecution(onNext: TexeraWebSocketEvent => Unit): Disposable = {
     val localDisposable = new CompositeDisposable()
     val disposable = executionService.subscribe { execService: 
WorkflowExecutionService =>
@@ -171,11 +175,23 @@ class WorkflowService(
         )
         .toSeq
       localDisposable.addAll(subscriptions: _*)
+      emitCacheUsageSnapshot(execService, onNext)
     }
     // Note: this new CompositeDisposable is necessary. DO NOT OPTIMIZE.
     new CompositeDisposable(localDisposable, disposable)
   }
 
+  /**
+    * Sends the latest cache usage metadata for the current execution to a new 
subscriber.
+    */
+  private def emitCacheUsageSnapshot(
+      execService: WorkflowExecutionService,
+      onNext: TexeraWebSocketEvent => Unit
+  ): Unit = {
+    val cachedOutputs = 
execService.executionStateStore.cacheUsageStore.getState.cachedOutputs
+    onNext(CacheUsageUpdateEvent(cachedOutputs))
+  }
+
   def disconnect(): Unit = {
     lifeCycleManager.decreaseUserCount(
       
Option(executionService.getValue).map(_.executionStateStore.metadataStore.getState.state)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala
 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala
new file mode 100644
index 0000000000..28d61810c7
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionCacheUsageStore.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.storage
+
+import org.apache.texera.web.model.websocket.event.CachedPortUsage
+
+/**
+  * Holds cached output entries that matched the current execution's workflow 
fingerprint.
+  *
+  * The store is used to emit websocket updates so the frontend can render 
cache metadata
+  * (e.g., source execution IDs) without leaking result URIs.
+  * The updatedAt field forces a state change even when the cache list is 
empty.
+  */
+case class ExecutionCacheUsageStore(
+    cachedOutputs: List[CachedPortUsage] = List.empty,
+    updatedAt: Long = System.currentTimeMillis()
+)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
index 654acbbefd..d307c8c021 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
@@ -55,8 +55,19 @@ class ExecutionStateStore {
   val consoleStore = new StateStore(ExecutionConsoleStore())
   val breakpointStore = new StateStore(ExecutionBreakpointStore())
   val reconfigurationStore = new StateStore(ExecutionReconfigurationStore())
+  val cacheUsageStore = new StateStore(ExecutionCacheUsageStore())
 
+  /**
+    * Returns all state stores that should publish websocket updates for an 
execution.
+    */
   def getAllStores: Iterable[StateStore[_]] = {
-    Iterable(statsStore, consoleStore, breakpointStore, metadataStore, 
reconfigurationStore)
+    Iterable(
+      statsStore,
+      consoleStore,
+      breakpointStore,
+      metadataStore,
+      reconfigurationStore,
+      cacheUsageStore
+    )
   }
 }
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 0ffaa2f826..7f627755e5 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -158,6 +158,17 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 - No `from_cache` flag required (can infer from numWorkers=0 + instant 
completion)
 - Lifecycle management (eviction, cleanup) deferred to future work
 
+**Cache metadata UI (Implemented)**:
+- Left panel "Cache" tab listing workflow cache entries (physical op id, port 
id, tuple count, source execution id, updated_at, short subdag hash)
+- Highlight cache entries matched for the current execution fingerprint
+- Output port labels show tuple counts, plus a second line with source 
execution id for cached outputs
+- Result URI hidden from the UI
+
+**Cache usage updates**:
+- `CacheUsageUpdateEvent` publishes matched cached outputs at submission time
+- Frontend uses the event to drive cache entry highlighting and per-port cache 
labels
+- Cache usage snapshots are re-emitted on websocket connect to keep labels 
visible after refresh
+
 ### 5. Service & DAO Architecture
 
 #### OperatorPortCacheDao
@@ -177,6 +188,13 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
   /** Upsert cache entry (insert or update on conflict) */
   def upsert(record: OperatorPortCacheRecord): Unit
 
+  /** List cache entries for a workflow (ordered by updated_at desc) */
+  def listByWorkflow(
+      workflowId: Long,
+      limit: Int,
+      offset: Int
+  ): Seq[OperatorPortCacheRecord]
+
   /** Delete all cache entries for a workflow */
   def deleteByWorkflow(workflowId: Long): Unit
 }
@@ -222,9 +240,9 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
 #### WorkflowExecutionsResource (REST API - Optional)
 **Location**: 
`/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala`
 
-HTTP endpoints for external access (if needed):
-- `GET /cache/{workflowId}/{portId}/{hash}`: Manual cache lookup
-- `DELETE /cache/{workflowId}`: Manual cache invalidation
+HTTP endpoints for external access:
+- `GET /executions/{workflowId}/cache?limit=<n>&offset=<n>`: List cache 
entries (result_uri omitted)
+- (Optional) `DELETE /cache/{workflowId}`: Manual cache invalidation
 
 **Note**: Internal services use `OperatorPortCacheService`, not the REST 
resource.
 
@@ -234,7 +252,7 @@ Phase 1.1 Service/DAO architecture is complete. Key 
components:
 
 | Component | Location | Purpose |
 |-----------|----------|---------|
-| **OperatorPortCacheDao** | 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | 
Low-level database access using Jooq. Methods: `get()`, `upsert()`, 
`deleteByWorkflow()` |
+| **OperatorPortCacheDao** | 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | 
Low-level database access using Jooq. Methods: `get()`, `upsert()`, 
`listByWorkflow()`, `deleteByWorkflow()` |
 | **OperatorPortCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
 | High-level cache operations. Methods: `lookupCachedOutputs()`, 
`upsertCachedOutput()`, `invalidateWorkflowCache()` |
 | **ExecutionCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
 | Event listener that registers callback for `PortMaterialized` events and 
bridges to service layer |
 | **PortMaterialized Event** | 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
 | Client event emitted when output port completes with URI and tuple count |
@@ -282,7 +300,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), 
`fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, 
`updated_at`
   - Timestamp managed by database (`DEFAULT now()`)
 - **Service/DAO architecture** (Phase 1.1 - Complete):
-  - `OperatorPortCacheDao`: Low-level database access with get/upsert/delete 
methods
+  - `OperatorPortCacheDao`: Low-level database access with 
get/upsert/listByWorkflow/delete methods
   - `OperatorPortCacheService`: High-level cache operations 
(lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache)
   - `ExecutionCacheService`: Event listener bridging controller events to 
service layer
   - Event-based communication via `PortMaterialized` event and 
`client.registerCallback[T]`
@@ -305,6 +323,13 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
   - Worker count label: cached operators show `from cache` instead of 
`#workers`
   - Region visualization: blue fill (`rgba(24,144,255,0.3)`) for cached 
regions in `workflow-editor.component.ts`
   - Region visibility: shared state via `WorkflowActionService.showRegion` 
ensures correct visibility when regions are created during execution
+- **Cache metadata UI** (Phase 1.3 - Complete):
+  - `CacheUsageUpdateEvent` publishes matched cached outputs for the current 
execution
+  - Left panel "Cache" tab lists cache entries (physical op id, port id, tuple 
count, source execution id, updated_at, short subdag hash)
+  - Cache entries highlight when they match the current workflow fingerprint
+  - Cached output ports show source execution id on a second label line
+  - REST: `GET /executions/{wid}/cache` lists cache entries (result URI 
omitted)
+  - Result URI omitted from UI payloads
 
 ### Architecture Integration
 The cache system integrates with three layers:
@@ -401,7 +426,7 @@ The cache system integrates with three layers:
   - `cacheService` created at workflow level
   - `executionCacheService` created per execution
 - [ ] Add unit tests for DAO operations
-- [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that 
delegate to service
+- [x] Add cache listing endpoint in `WorkflowExecutionsResource` (`GET 
/executions/{wid}/cache`)
 
 #### 1.2 Frontend Cache Visualization ✓ COMPLETE
 - [x] Add `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
@@ -426,7 +451,14 @@ The cache system integrates with three layers:
 - [x] Fix region visibility with shared state via 
`WorkflowActionService.showRegion`
   - Ensures regions show correctly when user toggles visibility before 
execution
 
-#### 1.3 Testing & Validation
+#### 1.3 Cache Metadata UI ✓ COMPLETE
+- [x] Add left panel "Cache" tab listing workflow cache entries (physical op 
id, port id, tuple count, source execution id, updated_at, short subdag hash)
+- [x] Highlight cache entries matched for the current execution fingerprint
+- [x] Show per-output-port sourceExecutionId on a second output port label line
+- [x] Re-emit cache usage snapshots on websocket connect to refresh cache 
labels after reload
+- [x] Keep result URI hidden in the UI
+
+#### 1.4 Testing & Validation
 - [ ] Verify downstream cached URI consumption across all operator types
 - [ ] Add integration tests: cache upsert → DB verification
 - [ ] Add E2E tests: run → cache → rerun → verify skip
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 73feecfdba..52e7ebf605 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -84,6 +84,7 @@ import { RowModalComponent } from 
"./workspace/component/result-panel/result-pan
 import { OperatorPropertyEditFrameComponent } from 
"./workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component";
 import { NzTabsModule } from "ng-zorro-antd/tabs";
 import { VersionsListComponent } from 
"./workspace/component/left-panel/versions-list/versions-list.component";
+import { CachePanelComponent } from 
"./workspace/component/left-panel/cache-panel/cache-panel.component";
 import { NzPaginationModule } from "ng-zorro-antd/pagination";
 import { JwtModule } from "@auth0/angular-jwt";
 import { AuthService } from "./common/service/user/auth.service";
@@ -196,6 +197,7 @@ registerLocaleData(en);
     SettingsComponent,
     PropertyEditorComponent,
     VersionsListComponent,
+    CachePanelComponent,
     TimeTravelComponent,
     WorkflowEditorComponent,
     ResultPanelComponent,
diff --git 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
index b15f668bff..7da51bb2b5 100644
--- 
a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
+++ 
b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts
@@ -22,6 +22,7 @@ import { Observable } from "rxjs";
 import { AppSettings } from "../../../../common/app-setting";
 import { HttpClient, HttpParams } from "@angular/common/http";
 import { WorkflowExecutionsEntry } from 
"../../../type/workflow-executions-entry";
+import { WorkflowCacheEntry } from "../../../type/workflow-cache-entry";
 import { WorkflowRuntimeStatistics } from 
"../../../type/workflow-runtime-statistics";
 import { ExecutionState } from 
"../../../../workspace/types/execute-workflow.interface";
 
@@ -91,4 +92,19 @@ export class WorkflowExecutionsService {
       params,
     });
   }
+
+  /**
+   * Retrieves cache entries for the workflow, ordered by most recent update.
+   * Limit and offset are optional; omit to fetch all entries.
+   */
+  retrieveWorkflowCacheEntries(wid: number, limit?: number, offset?: number): 
Observable<WorkflowCacheEntry[]> {
+    let params = new HttpParams();
+    if (limit !== undefined) {
+      params = params.set("limit", limit.toString());
+    }
+    if (offset !== undefined) {
+      params = params.set("offset", offset.toString());
+    }
+    return 
this.http.get<WorkflowCacheEntry[]>(`${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/cache`,
 { params });
+  }
 }
diff --git a/frontend/src/app/dashboard/type/workflow-cache-entry.ts 
b/frontend/src/app/dashboard/type/workflow-cache-entry.ts
new file mode 100644
index 0000000000..ae17ecf077
--- /dev/null
+++ b/frontend/src/app/dashboard/type/workflow-cache-entry.ts
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Cache entry metadata returned for a workflow.
+ */
+export interface WorkflowCacheEntry {
+  globalPortId: string;
+  logicalOpId: string;
+  layerName: string;
+  portId: number;
+  internal: boolean;
+  subdagHash: string;
+  tupleCount?: number;
+  sourceExecutionId?: number;
+  updatedAt: number;
+}
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
new file mode 100644
index 0000000000..39783defb3
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.html
@@ -0,0 +1,85 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<div class="cache-panel">
+  <div class="cache-panel__actions">
+    <button
+      nz-button
+      nzSize="small"
+      nzType="default"
+      [disabled]="loading"
+      (click)="refresh()">
+      Refresh
+    </button>
+    <span
+      class="cache-panel__count"
+      *ngIf="cacheEntries.length">
+      {{ cacheEntries.length }} entries
+    </span>
+  </div>
+
+  <nz-table
+    nzSize="small"
+    [nzSimple]="true"
+    [nzFrontPagination]="false"
+    nzTableLayout="auto"
+    [nzLoading]="loading"
+    *ngIf="loading || cacheEntries.length"
+    [nzData]="cacheEntries">
+    <thead>
+      <tr>
+        <th>Port</th>
+        <th>Cache Info</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr
+        *ngFor="let entry of cacheEntries"
+        [class.cache-match]="isMatched(entry)">
+        <td>
+          <div class="cache-port">
+            <div class="cache-port__op">{{ entry.logicalOpId }}</div>
+            <div class="cache-port__layer">{{ entry.layerName }}</div>
+            <div class="cache-port__id">
+              port {{ entry.portId }}<span *ngIf="entry.internal"> 
(internal)</span>
+            </div>
+          </div>
+        </td>
+        <td>
+          <div class="cache-meta">
+            <nz-tag
+              *ngIf="isMatched(entry)"
+              nzColor="blue">
+              Matched
+            </nz-tag>
+            <div>source execution: {{ 
formatSourceExecutionId(entry.sourceExecutionId) }}</div>
+            <div>tuples: {{ formatTupleCount(entry.tupleCount) }}</div>
+            <div>updated: {{ entry.updatedAt ? (entry.updatedAt | 
date:'short') : '-' }}</div>
+            <div>hash: {{ shortenSubdagHash(entry.subdagHash) }}</div>
+          </div>
+        </td>
+      </tr>
+    </tbody>
+  </nz-table>
+
+  <nz-empty
+    *ngIf="!loading && !cacheEntries.length"
+    nzNotFoundContent="No cache entries">
+  </nz-empty>
+</div>
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
new file mode 100644
index 0000000000..402fac73b2
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.scss
@@ -0,0 +1,37 @@
+.cache-panel {
+  display: flex;
+  flex-direction: column;
+  gap: 8px;
+}
+
+.cache-panel__actions {
+  display: flex;
+  align-items: center;
+  gap: 8px;
+}
+
+.cache-panel__count {
+  color: #666;
+  font-size: 12px;
+}
+
+.cache-port__op {
+  font-weight: 600;
+}
+
+.cache-port__layer,
+.cache-port__id {
+  color: #666;
+  font-size: 12px;
+}
+
+.cache-meta {
+  display: flex;
+  flex-direction: column;
+  gap: 2px;
+  font-size: 12px;
+}
+
+tr.cache-match {
+  background-color: rgba(24, 144, 255, 0.08);
+}
diff --git 
a/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
new file mode 100644
index 0000000000..33ca2e7463
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/left-panel/cache-panel/cache-panel.component.ts
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Component, OnInit } from "@angular/core";
+import { ActivatedRoute } from "@angular/router";
+import { finalize } from "rxjs/operators";
+import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { WorkflowExecutionsService } from 
"../../../../dashboard/service/user/workflow-executions/workflow-executions.service";
+import { WorkflowCacheEntry } from 
"../../../../dashboard/type/workflow-cache-entry";
+import { CacheUsageService } from 
"../../../service/workflow-status/cache-usage.service";
+
+/**
+ * CachePanelComponent renders cache entry metadata for the current workflow.
+ */
+@UntilDestroy()
+@Component({
+  selector: "texera-cache-panel",
+  templateUrl: "cache-panel.component.html",
+  styleUrls: ["cache-panel.component.scss"],
+})
+export class CachePanelComponent implements OnInit {
+  public cacheEntries: WorkflowCacheEntry[] = [];
+  public loading = false;
+  private workflowId?: number;
+  private usageKeys = new Set<string>();
+
+  constructor(
+    private workflowExecutionsService: WorkflowExecutionsService,
+    private cacheUsageService: CacheUsageService,
+    private route: ActivatedRoute
+  ) {}
+
+  ngOnInit(): void {
+    const workflowId = Number(this.route.snapshot.params.id);
+    if (!workflowId) {
+      return;
+    }
+    this.workflowId = workflowId;
+    this.refresh();
+    this.cacheUsageService
+      .getCacheUsageStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(entries => {
+        this.usageKeys = new Set(
+          entries.map(entry => 
this.cacheUsageService.buildUsageKey(entry.globalPortId, entry.subdagHash))
+        );
+      });
+  }
+
+  /**
+   * Refreshes the cache entry list from the backend.
+   */
+  public refresh(): void {
+    if (!this.workflowId) {
+      return;
+    }
+    this.loading = true;
+    this.workflowExecutionsService
+      .retrieveWorkflowCacheEntries(this.workflowId)
+      .pipe(
+        finalize(() => {
+          this.loading = false;
+        }),
+        untilDestroyed(this)
+      )
+      .subscribe(entries => {
+        this.cacheEntries = entries;
+      });
+  }
+
+  /**
+   * Returns true when a cache entry matches the current execution fingerprint.
+   */
+  public isMatched(entry: WorkflowCacheEntry): boolean {
+    return 
this.usageKeys.has(this.cacheUsageService.buildUsageKey(entry.globalPortId, 
entry.subdagHash));
+  }
+
+  /**
+   * Formats tuple counts for display.
+   */
+  public formatTupleCount(tupleCount?: number): string {
+    return tupleCount === undefined ? "-" : tupleCount.toString();
+  }
+
+  /**
+   * Formats source execution IDs for display.
+   */
+  public formatSourceExecutionId(sourceExecutionId?: number): string {
+    return sourceExecutionId === undefined ? "-" : 
sourceExecutionId.toString();
+  }
+
+  /**
+   * Shortens subDAG hash for compact display.
+   */
+  public shortenSubdagHash(hash: string): string {
+    return hash.length > 8 ? hash.slice(0, 8) : hash;
+  }
+}
diff --git 
a/frontend/src/app/workspace/component/left-panel/left-panel.component.ts 
b/frontend/src/app/workspace/component/left-panel/left-panel.component.ts
index 8dd9a5748b..3a6db75604 100644
--- a/frontend/src/app/workspace/component/left-panel/left-panel.component.ts
+++ b/frontend/src/app/workspace/component/left-panel/left-panel.component.ts
@@ -23,6 +23,7 @@ import { NzResizeEvent } from "ng-zorro-antd/resizable";
 import { CdkDragDrop, moveItemInArray } from "@angular/cdk/drag-drop";
 import { OperatorMenuComponent } from 
"./operator-menu/operator-menu.component";
 import { VersionsListComponent } from 
"./versions-list/versions-list.component";
+import { CachePanelComponent } from "./cache-panel/cache-panel.component";
 import { WorkflowExecutionHistoryComponent } from 
"../../../dashboard/component/user/user-workflow/ngbd-modal-workflow-executions/workflow-execution-history.component";
 import { TimeTravelComponent } from "./time-travel/time-travel.component";
 import { SettingsComponent } from "./settings/settings.component";
@@ -51,6 +52,7 @@ export class LeftPanelComponent implements OnDestroy, OnInit, 
AfterViewInit {
     { component: null, title: "", icon: "", enabled: true },
     { component: OperatorMenuComponent, title: "Operators", icon: "appstore", 
enabled: true },
     { component: VersionsListComponent, title: "Versions", icon: "schedule", 
enabled: true },
+    { component: CachePanelComponent, title: "Cache", icon: "database", 
enabled: true },
     {
       component: SettingsComponent,
       title: "Settings",
@@ -93,9 +95,12 @@ export class LeftPanelComponent implements OnDestroy, 
OnInit, AfterViewInit {
     this.height = Number(localStorage.getItem("left-panel-height")) || 
this.height;
   }
 
+  /**
+   * Applies feature flags to left panel items that are conditionally enabled.
+   */
   private updateItemsWithConfig(): void {
-    this.items[4].enabled = this.config.env.workflowExecutionsTrackingEnabled; 
// Execution History
-    this.items[5].enabled = this.config.env.timetravelEnabled; // Time Travel
+    this.items[5].enabled = this.config.env.workflowExecutionsTrackingEnabled; 
// Execution History
+    this.items[6].enabled = this.config.env.timetravelEnabled; // Time Travel
   }
 
   ngOnInit(): void {
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index 2d43f80a61..45b6857327 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -27,6 +27,7 @@ import { ExecuteWorkflowService } from 
"../../service/execute-workflow/execute-w
 import { fromJointPaperEvent, JointUIService, linkPathStrokeColor } from 
"../../service/joint-ui/joint-ui.service";
 import { ValidationWorkflowService } from 
"../../service/validation/validation-workflow.service";
 import { WorkflowActionService } from 
"../../service/workflow-graph/model/workflow-action.service";
+import { CacheUsageService } from 
"../../service/workflow-status/cache-usage.service";
 import { WorkflowStatusService } from 
"../../service/workflow-status/workflow-status.service";
 import { ExecutionState, OperatorState } from 
"../../types/execute-workflow.interface";
 import { LogicalPort, OperatorLink } from 
"../../types/workflow-common.interface";
@@ -102,6 +103,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     private validationWorkflowService: ValidationWorkflowService,
     private jointUIService: JointUIService,
     private workflowStatusService: WorkflowStatusService,
+    private cacheUsageService: CacheUsageService,
     private executeWorkflowService: ExecuteWorkflowService,
     private nzModalService: NzModalService,
     private changeDetectorRef: ChangeDetectorRef,
@@ -163,6 +165,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     this.handlePortHighlightEvent();
     this.registerPortDisplayNameChangeHandler();
     this.handleOperatorStatisticsUpdate();
+    this.handleCacheUsageUpdate();
     this.handleRegionEvents();
     this.handleOperatorSuggestionHighlightEvent();
     this.handleElementDelete();
@@ -303,7 +306,8 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
               op.operatorID,
               status[op.operatorID],
               this.isSource(op.operatorID),
-              this.isSink(op.operatorID)
+              this.isSink(op.operatorID),
+              this.cacheUsageService.getPortCacheLabels(op.operatorID)
             );
           });
       });
@@ -333,6 +337,27 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       });
   }
 
+  /**
+   * Updates cached output port labels whenever cache usage metadata changes.
+   */
+  private handleCacheUsageUpdate(): void {
+    this.cacheUsageService
+      .getCacheUsageStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(() => {
+        this.workflowActionService
+          .getTexeraGraph()
+          .getAllOperators()
+          .forEach(op => {
+            this.jointUIService.changeOperatorCacheLabels(
+              this.paper,
+              op.operatorID,
+              this.cacheUsageService.getPortCacheLabels(op.operatorID)
+            );
+          });
+      });
+  }
+
   private handleRegionEvents(): void {
     const Region = joint.dia.Element.define(
       "region",
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index d89624a2c8..69095ff3e0 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -236,6 +236,7 @@ export class JointUIService {
         width: JointUIService.DEFAULT_OPERATOR_WIDTH,
         height: JointUIService.DEFAULT_OPERATOR_HEIGHT,
       },
+      portLabelMarkup: JointUIService.getCustomPortLabelMarkup(),
       attrs: JointUIService.getCustomOperatorStyleAttrs(
         operator,
         operator.customDisplayName ?? 
operatorSchema.additionalMetadata.userFriendlyName,
@@ -244,8 +245,26 @@ export class JointUIService {
       ),
       ports: {
         groups: {
-          in: { attrs: JointUIService.getCustomPortStyleAttrs() },
-          out: { attrs: JointUIService.getCustomPortStyleAttrs() },
+          in: {
+            attrs: JointUIService.getCustomPortStyleAttrs(),
+            markup: JointUIService.getCustomPortMarkup(),
+            label: {
+              position: {
+                name: "left",
+                args: { x: -5, y: 10 },
+              },
+            },
+          },
+          out: {
+            attrs: JointUIService.getCustomPortStyleAttrs(),
+            markup: JointUIService.getCustomPortMarkup(),
+            label: {
+              position: {
+                name: "right",
+                args: { x: 5, y: -10 },
+              },
+            },
+          },
         },
       },
       markup: TexeraCustomJointElement.getMarkup(
@@ -299,12 +318,17 @@ export class JointUIService {
     return operatorElement;
   }
 
+  /**
+   * Updates operator state, worker labels, and per-port counts using latest 
statistics.
+   * Cache labels are applied for cached outputs and positioned to avoid 
overlapping the outgoing edge.
+   */
   public changeOperatorStatistics(
     jointPaper: joint.dia.Paper,
     operatorID: string,
     statistics: OperatorStatistics | undefined,
     isSource: boolean,
-    isSink: boolean
+    isSink: boolean,
+    cachePortLabels?: Record<string, string>
   ): void {
     if (!statistics) {
       this.changeOperatorState(jointPaper, operatorID, 
OperatorState.Uninitialized);
@@ -362,14 +386,43 @@ export class JointUIService {
           originalName = portId;
         }
 
-        const labelText =
-          isSkippedFromCache && count === undefined ? "-" : String(count ?? 0);
-
-        element.portProp(portId, "attrs/.port-label/text", labelText);
+        const baseLabel = isSkippedFromCache && count === undefined ? "-" : 
String(count ?? 0);
+        element.portProp(portId, "attrs/.port-label/text", baseLabel);
       }
     });
+    const effectiveCacheLabels = isSkippedFromCache ? cachePortLabels : 
undefined;
+    this.changeOperatorCacheLabels(jointPaper, operatorID, 
effectiveCacheLabels);
     this.changeOperatorState(jointPaper, operatorID, statistics.operatorState);
   }
+
+  /**
+   * Updates cache usage labels for output ports without changing counts or 
operator state.
+   */
+  public changeOperatorCacheLabels(
+    jointPaper: joint.dia.Paper,
+    operatorID: string,
+    cachePortLabels?: Record<string, string>
+  ): void {
+    const element = jointPaper.getModelById(operatorID) as 
joint.shapes.devs.Model;
+    if (!element) {
+      return;
+    }
+    const outPorts = element.getPorts().filter(p => p.group === "out");
+    outPorts.forEach(portDef => {
+      const portId = portDef.id;
+      if (portId != null) {
+        const parts = portId.split("-");
+        const numericSuffix = parts.length > 1 ? parts[1] : portId;
+        const cacheLabel = cachePortLabels?.[numericSuffix] ?? "";
+        element.portProp(portId, "attrs/.port-cache-label/text", cacheLabel);
+        element.portProp(
+          portId,
+          "attrs/.port-cache-label/transform",
+          cacheLabel ? "translate(0, 12)" : ""
+        );
+      }
+    });
+  }
   public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): 
void {
     jointPaper.getModelById(operatorID).attr({
       [`.${operatorStateClass}`]: { visibility: "hidden" },
@@ -435,6 +488,7 @@ export class JointUIService {
     inPorts.forEach(p => {
       if (p.id != null) {
         element.portProp(p.id, "attrs/.port-label/fill", fillColor);
+        element.portProp(p.id, "attrs/.port-cache-label/fill", fillColor);
       }
     });
 
@@ -442,6 +496,7 @@ export class JointUIService {
     outPorts.forEach(p => {
       if (p.id != null) {
         element.portProp(p.id, "attrs/.port-label/fill", fillColor);
+        element.portProp(p.id, "attrs/.port-cache-label/fill", fillColor);
       }
     });
   }
@@ -611,6 +666,7 @@ export class JointUIService {
   /**
    * This function changes the default svg of the operator ports.
    * It hides the port label that will display 'out/in' beside the operators.
+   * Port labels remain visible for per-port metrics and cache metadata.
    *
    * @returns the custom attributes of the ports
    */
@@ -633,6 +689,43 @@ export class JointUIService {
     };
   }
 
+  /**
+   * Defines the default markup for ports.
+   */
+  public static getCustomPortMarkup(): any[] {
+    return [
+      {
+        tagName: "circle",
+        selector: ".port-body",
+        attributes: {
+          class: "port-body",
+        },
+      },
+    ];
+  }
+
+  /**
+   * Defines the default port label markup for counts and cache metadata.
+   */
+  public static getCustomPortLabelMarkup(): any[] {
+    return [
+      {
+        tagName: "text",
+        selector: ".port-label",
+        attributes: {
+          class: "port-label",
+        },
+      },
+      {
+        tagName: "text",
+        selector: ".port-cache-label",
+        attributes: {
+          class: "port-cache-label",
+        },
+      },
+    ];
+  }
+
   /**
    * This function create a custom svg style for the operator
    * @returns the custom attributes of the tooltip.
diff --git 
a/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts 
b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts
new file mode 100644
index 0000000000..59f10e8d54
--- /dev/null
+++ b/frontend/src/app/workspace/service/workflow-status/cache-usage.service.ts
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Injectable } from "@angular/core";
+import { BehaviorSubject, Observable } from "rxjs";
+import { CachedPortUsage } from "../../types/workflow-websocket.interface";
+import { WorkflowWebsocketService } from 
"../workflow-websocket/workflow-websocket.service";
+
+/**
+ * Stores cache usage metadata emitted for the current execution and exposes 
helpers
+ * for rendering per-port cache labels and cache entry highlights.
+ */
+@Injectable({
+  providedIn: "root",
+})
+export class CacheUsageService {
+  private readonly cacheUsageSubject = new 
BehaviorSubject<ReadonlyArray<CachedPortUsage>>([]);
+
+  constructor(private workflowWebsocketService: WorkflowWebsocketService) {
+    this.registerCacheUsageListener();
+  }
+
+  /**
+   * Returns a stream of cached output metadata matched for the current 
execution.
+   */
+  public getCacheUsageStream(): Observable<ReadonlyArray<CachedPortUsage>> {
+    return this.cacheUsageSubject.asObservable();
+  }
+
+  /**
+   * Returns the latest cached output metadata snapshot.
+   */
+  public getCacheUsageSnapshot(): ReadonlyArray<CachedPortUsage> {
+    return this.cacheUsageSubject.value;
+  }
+
+  /**
+   * Builds label text per output port for a logical operator, keyed by port 
id.
+   * Each label is a single line (e.g., "src 42") that is rendered below the 
port count.
+   */
+  public getPortCacheLabels(operatorId: string): Record<string, string> {
+    const labels: Record<string, string> = {};
+    this.cacheUsageSubject.value
+      .filter(entry => entry.logicalOpId === operatorId)
+      .forEach(entry => {
+        const portKey = entry.portId.toString();
+        const executionId = entry.sourceExecutionId ?? "unknown";
+        labels[portKey] = `src ${executionId}`;
+      });
+    return labels;
+  }
+
+  /**
+   * Builds a stable key to match cache entries against cache usage updates.
+   */
+  public buildUsageKey(globalPortId: string, subdagHash: string): string {
+    return `${globalPortId}|${subdagHash}`;
+  }
+
+  private registerCacheUsageListener(): void {
+    
this.workflowWebsocketService.subscribeToEvent("CacheUsageUpdateEvent").subscribe(event
 => {
+      this.cacheUsageSubject.next(event.cachedOutputs);
+    });
+  }
+}
diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts 
b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
index afd5ea6f04..e22633f213 100644
--- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
+++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
@@ -141,6 +141,22 @@ export interface CacheStatusUpdateEvent
     cacheStatusMap: Record<string, OperatorResultCacheStatus>;
   }> {}
 
+export type CachedPortUsage = Readonly<{
+  globalPortId: string;
+  logicalOpId: string;
+  layerName: string;
+  portId: number;
+  internal: boolean;
+  subdagHash: string;
+  tupleCount?: number;
+  sourceExecutionId?: number;
+}>;
+
+export interface CacheUsageUpdateEvent
+  extends Readonly<{
+    cachedOutputs: ReadonlyArray<CachedPortUsage>;
+  }> {}
+
 export type PythonExpressionEvaluateRequest = Readonly<{
   expression: string;
   operatorId: string;
@@ -235,6 +251,7 @@ export type TexeraWebsocketEventTypeMap = {
   ResultExportResponse: ResultExportResponse;
   WorkflowAvailableResultEvent: WorkflowAvailableResultEvent;
   CacheStatusUpdateEvent: CacheStatusUpdateEvent;
+  CacheUsageUpdateEvent: CacheUsageUpdateEvent;
   PythonExpressionEvaluateResponse: PythonExpressionEvaluateResponse;
   WorkerAssignmentUpdateEvent: WorkerAssignmentUpdateEvent;
   ModifyLogicResponse: ModifyLogicResponse;

Reply via email to