This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit acaf348c5fb099decc4c476b5c137378bccf99ad
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 12 19:27:19 2026 -0800

    feat(cache): refactor amber client-side services and cache upsert logic.
---
 .../architecture/controller/ClientEvent.scala      |   6 +
 .../promisehandlers/PortCompletedHandler.scala     |  32 +-
 .../texera/web/dao/OperatorPortCacheDao.scala      | 147 +++++++++
 .../texera/web/service/ExecutionCacheService.scala |  70 ++++
 .../web/service/OperatorPortCacheService.scala     | 144 ++++++++
 .../web/service/WorkflowExecutionService.scala     |  27 +-
 .../texera/web/service/WorkflowService.scala       |   7 +
 docs/operator-port-cache.md                        | 363 +++++----------------
 8 files changed, 469 insertions(+), 327 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
index ea83eedd2a..9848ba5b0a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala
@@ -46,3 +46,9 @@ case class UpdateExecutorCompleted(id: ActorVirtualIdentity) 
extends ClientEvent
 final case class ReplayStatusUpdate(id: ActorVirtualIdentity, status: Boolean) 
extends ClientEvent
 
 final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends 
ClientEvent
+
+case class PortMaterialized(
+    portId: org.apache.texera.amber.core.workflow.GlobalPortIdentity,
+    resultUri: java.net.URI,
+    tupleCount: Option[Long]
+) extends ClientEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index a2c7894bc2..a613c7cef9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -23,8 +23,7 @@ import com.twitter.util.Future
 import org.apache.texera.amber.core.WorkflowRuntimeException
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.workflow.GlobalPortIdentity
-import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
-import 
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 FatalError}
+import 
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 FatalError, PortMaterialized}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 PortCompletedRequest, QueryStatisticsRequest}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -70,32 +69,21 @@ trait PortCompletedHandler {
               else operatorExecution.isOutputPortCompleted(msg.portId)
 
             if (isPortCompleted) {
-              // If this is an output port and materialized, persist cache 
metadata.
+              // If this is an output port and materialized, notify client 
(for cache upsert).
               if (!msg.input) {
                 val storageUriOpt =
                   WorkflowExecutionsResource.getResultUriByPhysicalPortId(
                     cp.workflowContext.executionId,
                     globalPortId
                   )
-                (storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) 
match {
-                  case (Some(uri), Some(plan)) =>
-                    val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(plan, globalPortId)
-                    val tupleCount =
-                      try {
-                        Some(DocumentFactory.openDocument(uri)._1.getCount)
-                      } catch {
-                        case _: Throwable => None
-                      }
-                    WorkflowExecutionsResource.upsertOperatorPortCache(
-                      cp.workflowContext.workflowId,
-                      globalPortId,
-                      fingerprint.subdagHash,
-                      fingerprint.fingerprintJson,
-                      uri,
-                      tupleCount,
-                      Some(cp.workflowContext.executionId)
-                    )
-                  case _ => // no-op if plan or URI is missing
+                storageUriOpt.foreach { uri =>
+                  val tupleCount =
+                    try {
+                      Some(DocumentFactory.openDocument(uri)._1.getCount)
+                    } catch {
+                      case _: Throwable => None
+                    }
+                  sendToClient(PortMaterialized(globalPortId, uri, tupleCount))
                 }
               }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala 
b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
new file mode 100644
index 0000000000..f6bf30efb2
--- /dev/null
+++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.dao
+
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE
+import org.jooq.DSLContext
+
+import java.net.URI
+import scala.jdk.OptionConverters._
+
+/**
+  * Record representing a cache entry in the operator_port_cache table.
+  *
+  * @param workflowId Workflow ID
+  * @param globalPortId Serialized GlobalPortIdentity
+  * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint
+  * @param fingerprintJson Canonical JSON of the upstream subDAG
+  * @param resultUri URI of the materialized output
+  * @param tupleCount Number of tuples in the cached output (optional)
+  * @param sourceExecutionId Execution ID that produced this cache entry 
(optional)
+  *
+  * Note: updated_at timestamp is managed by the database (DEFAULT now())
+  */
+case class OperatorPortCacheRecord(
+    workflowId: Long,
+    globalPortId: String,
+    subdagHash: String,
+    fingerprintJson: String,
+    resultUri: URI,
+    tupleCount: Option[Long],
+    sourceExecutionId: Option[Long]
+)
+
+/**
+  * Data Access Object for operator_port_cache table.
+  * Provides low-level CRUD operations using Jooq.
+  *
+  * @param sqlServer SqlServer instance for database access
+  */
+class OperatorPortCacheDao(sqlServer: SqlServer) {
+  private val context: DSLContext = sqlServer.createDSLContext()
+
+  /**
+    * Retrieve a cache entry by primary key (workflow_id, global_port_id, 
subdag_hash).
+    *
+    * @param workflowId Workflow ID
+    * @param serializedPortId Serialized GlobalPortIdentity string
+    * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint
+    * @return Some(OperatorPortCacheRecord) if found, None otherwise
+    */
+  def get(
+      workflowId: Long,
+      serializedPortId: String,
+      subdagHash: String
+  ): Option[OperatorPortCacheRecord] = {
+    context
+      .select(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH,
+        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
+        OPERATOR_PORT_CACHE.RESULT_URI,
+        OPERATOR_PORT_CACHE.TUPLE_COUNT,
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID
+      )
+      .from(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId))
+      .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash))
+      .fetchOptional()
+      .toScala
+      .map { record =>
+        OperatorPortCacheRecord(
+          workflowId = record.value1().longValue(),
+          globalPortId = record.value2(),
+          subdagHash = record.value3(),
+          fingerprintJson = record.value4(),
+          resultUri = URI.create(record.value5()),
+          tupleCount = Option(record.value6()).map(_.longValue()),
+          sourceExecutionId = Option(record.value7()).map(_.longValue())
+        )
+      }
+  }
+
+  /**
+    * Insert or update a cache entry (upsert).
+    * On conflict (workflow_id, global_port_id, subdag_hash), updates the 
existing record.
+    *
+    * @param record OperatorPortCacheRecord to insert/update
+    */
+  def upsert(record: OperatorPortCacheRecord): Unit = {
+    val dbRecord = context.newRecord(OPERATOR_PORT_CACHE)
+    dbRecord.setWorkflowId(record.workflowId.toInt)
+    dbRecord.setGlobalPortId(record.globalPortId)
+    dbRecord.setSubdagHash(record.subdagHash)
+    dbRecord.setFingerprintJson(record.fingerprintJson)
+    dbRecord.setResultUri(record.resultUri.toString)
+    record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c)))
+    record.sourceExecutionId.foreach(eid => 
dbRecord.setSourceExecutionId(Long.box(eid)))
+
+    context
+      .insertInto(OPERATOR_PORT_CACHE)
+      .set(dbRecord)
+      .onConflict(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH
+      )
+      .doUpdate()
+      .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri)
+      .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson)
+      .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount)
+      .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, 
dbRecord.getSourceExecutionId)
+      .execute()
+  }
+
+  /**
+    * Delete all cache entries for a specific workflow.
+    * Useful for cache invalidation when a workflow is deleted or manually 
cleared.
+    *
+    * @param workflowId Workflow ID whose cache entries should be deleted
+    */
+  def deleteByWorkflow(workflowId: Long): Unit = {
+    context
+      .deleteFrom(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
+      .execute()
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
new file mode 100644
index 0000000000..ee5ceeafe9
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.PortMaterialized
+import org.apache.texera.amber.engine.common.client.AmberClient
+import org.apache.texera.web.SubscriptionManager
+
+/**
+  * Service that listens for port materialization events from the controller
+  * and persists cache metadata.
+  *
+  * @param client AmberClient to register callbacks
+  * @param cacheService OperatorPortCacheService for cache persistence
+  * @param workflowContext WorkflowContext for workflow/execution IDs
+  * @param physicalPlan PhysicalPlan for fingerprint computation
+  */
+class ExecutionCacheService(
+    client: AmberClient,
+    cacheService: OperatorPortCacheService,
+    workflowContext: WorkflowContext,
+    physicalPlan: PhysicalPlan
+) extends SubscriptionManager
+    with LazyLogging {
+
+  registerCallbacks()
+
+  private def registerCallbacks(): Unit = {
+    addSubscription(
+      client
+        .registerCallback[PortMaterialized]((evt: PortMaterialized) => {
+          logger.info(
+            s"Port materialized: ${evt.portId}, URI: ${evt.resultUri}, tuple 
count: ${evt.tupleCount}"
+          )
+          try {
+            cacheService.upsertCachedOutput(
+              workflowContext.workflowId,
+              workflowContext.executionId,
+              evt.portId,
+              physicalPlan,
+              evt.resultUri,
+              evt.tupleCount
+            )
+          } catch {
+            case e: Throwable =>
+              logger.error(s"Failed to upsert cache for port ${evt.portId}", e)
+          }
+        })
+    )
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
new file mode 100644
index 0000000000..9e0155e3a4
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
+import org.apache.texera.amber.core.workflow.{CachedOutput, 
GlobalPortIdentity, PhysicalPlan}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import org.apache.texera.web.dao.{OperatorPortCacheDao, 
OperatorPortCacheRecord}
+
+import java.net.URI
+
+/**
+  * Service for operator port result caching.
+  * Provides high-level cache operations with business logic for workflow 
execution.
+  *
+  * Key responsibilities:
+  * - Batch lookup of cached outputs at workflow submission time
+  * - Cache entry creation when output ports complete
+  * - Fingerprint computation and serialization
+  * - Cache invalidation and lifecycle management
+  *
+  * @param dao OperatorPortCacheDao for database access
+  */
+class OperatorPortCacheService(dao: OperatorPortCacheDao) {
+
+  /**
+    * Lookup cached outputs for all materializable ports in the physical plan.
+    * Called at workflow submission time by WorkflowExecutionService.
+    *
+    * For each output port in the plan:
+    * 1. Compute fingerprint of upstream subDAG
+    * 2. Query cache by (workflow_id, port_id, fingerprint_hash)
+    * 3. Collect all cache hits
+    *
+    * @param workflowId Workflow ID to lookup cache for
+    * @param physicalPlan Physical plan containing operators and ports
+    * @return Map from GlobalPortIdentity to CachedOutput for all cache hits
+    */
+  def lookupCachedOutputs(
+      workflowId: WorkflowIdentity,
+      physicalPlan: PhysicalPlan
+  ): Map[GlobalPortIdentity, CachedOutput] = {
+    physicalPlan.operators
+      .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, 
pid)))
+      .flatMap { gpid =>
+        val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
+        dao.get(workflowId.id, gpid.serializeAsString, 
fingerprint.subdagHash).map { record =>
+          gpid -> CachedOutput(
+            resultUri = record.resultUri,
+            fingerprintJson = record.fingerprintJson,
+            tupleCount = record.tupleCount,
+            sourceExecutionId = 
record.sourceExecutionId.map(ExecutionIdentity(_))
+          )
+        }
+      }
+      .toMap
+  }
+
+  /**
+    * Upsert cache entry when an output port completes.
+    * Called by PortCompletedHandler at runtime when a materialized output is 
produced.
+    *
+    * Steps:
+    * 1. Compute fingerprint of upstream subDAG
+    * 2. Upsert to operator_port_cache table with fingerprint, URI, metadata
+    *
+    * @param workflowId Workflow ID
+    * @param executionId Execution ID that produced this output
+    * @param portId GlobalPortIdentity of the completed port
+    * @param physicalPlan Physical plan (needed for fingerprint computation)
+    * @param resultUri URI where the materialized output is stored
+    * @param tupleCount Number of tuples in the output (optional, best-effort)
+    */
+  def upsertCachedOutput(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity,
+      portId: GlobalPortIdentity,
+      physicalPlan: PhysicalPlan,
+      resultUri: URI,
+      tupleCount: Option[Long]
+  ): Unit = {
+    val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
portId)
+
+    dao.upsert(
+      OperatorPortCacheRecord(
+        workflowId = workflowId.id,
+        globalPortId = portId.serializeAsString,
+        subdagHash = fingerprint.subdagHash,
+        fingerprintJson = fingerprint.fingerprintJson,
+        resultUri = resultUri,
+        tupleCount = tupleCount,
+        sourceExecutionId = Some(executionId.id)
+      )
+    )
+  }
+
+  /**
+    * Invalidate all cache entries for a workflow.
+    * Useful for:
+    * - Manual cache clearing via REST API
+    * - Workflow deletion (cleanup)
+    * - Testing
+    *
+    * @param workflowId Workflow ID whose cache entries should be deleted
+    */
+  def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = {
+    dao.deleteByWorkflow(workflowId.id)
+  }
+
+  /**
+    * Future: Cost-aware eviction when storage quota is exceeded.
+    * Phase 3: Lifecycle management research.
+    *
+    * Proposed approach:
+    * - Calculate recompute_cost / storage_cost ratio for each cache entry
+    * - Evict entries with lowest ratio first
+    * - Use runtime_statistics table for cost estimation
+    *
+    * @param quotaBytes Storage quota in bytes
+    */
+  def evictLowValueEntries(quotaBytes: Long): Unit = {
+    throw new UnsupportedOperationException(
+      "Cost-aware eviction not yet implemented (Phase 3: Lifecycle management)"
+    )
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index ade57d403b..38eef83ad6 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -28,14 +28,9 @@ import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAg
 import org.apache.texera.amber.engine.common.Utils
 import org.apache.texera.amber.engine.common.client.AmberClient
 import 
org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore
-import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
 import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity}
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
-import org.apache.texera.web.model.websocket.event.{
-  TexeraWebSocketEvent,
-  WorkflowErrorEvent,
-  WorkflowStateEvent
-}
+import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, 
WorkflowErrorEvent, WorkflowStateEvent}
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 import org.apache.texera.web.storage.ExecutionStateStore
@@ -61,6 +56,7 @@ class WorkflowExecutionService(
     controllerConfig: ControllerConfig,
     val workflowContext: WorkflowContext,
     resultService: ExecutionResultService,
+    cacheService: OperatorPortCacheService,
     request: WorkflowExecuteRequest,
     val executionStateStore: ExecutionStateStore,
     errorHandler: Throwable => Unit,
@@ -105,22 +101,14 @@ class WorkflowExecutionService(
   var executionStatsService: ExecutionStatsService = _
   var executionRuntimeService: ExecutionRuntimeService = _
   var executionConsoleService: ExecutionConsoleService = _
+  var executionCacheService: ExecutionCacheService = _
 
   private def computeCachedOutputs(
       physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan
   ): Map[String, CachedOutput] = {
-    physicalPlan.operators
-      .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, 
pid)))
-      .flatMap { gpid =>
-        val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
-        WorkflowExecutionsResource
-          .getOperatorPortCache(workflowContext.workflowId, gpid, 
fingerprint.subdagHash)
-          .map {
-            case (uri, tupleCount, fpJson, sourceEid) =>
-              gpid.serializeAsString -> CachedOutput(uri, fpJson, tupleCount, 
sourceEid)
-          }
-      }
-      .toMap
+    cacheService
+      .lookupCachedOutputs(workflowContext.workflowId, physicalPlan)
+      .map { case (gpid, cached) => gpid.serializeAsString -> cached }
   }
 
   def executeWorkflow(): Unit = {
@@ -144,6 +132,8 @@ class WorkflowExecutionService(
     executionReconfigurationService =
       new ExecutionReconfigurationService(client, executionStateStore, 
workflow)
     executionStatsService = new ExecutionStatsService(client, 
executionStateStore, workflow.context)
+    executionCacheService =
+      new ExecutionCacheService(client, cacheService, workflow.context, 
workflow.physicalPlan)
     executionRuntimeService = new ExecutionRuntimeService(
       client,
       executionStateStore,
@@ -196,6 +186,7 @@ class WorkflowExecutionService(
       executionRuntimeService.unsubscribeAll()
       executionConsoleService.unsubscribeAll()
       executionStatsService.unsubscribeAll()
+      executionCacheService.unsubscribeAll()
       executionReconfigurationService.unsubscribeAll()
     }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index e35a707aeb..1756ff8032 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -107,6 +107,12 @@ class WorkflowService(
 
   val resultService: ExecutionResultService =
     new ExecutionResultService(workflowId, computingUnitId, stateStore)
+  val cacheService: OperatorPortCacheService = {
+    val dao = new org.apache.texera.web.dao.OperatorPortCacheDao(
+      org.apache.texera.dao.SqlServer.getInstance()
+    )
+    new OperatorPortCacheService(dao)
+  }
   val lifeCycleManager: WorkflowLifecycleManager = new 
WorkflowLifecycleManager(
     s"workflowId=$workflowId",
     cleanUpTimeout,
@@ -275,6 +281,7 @@ class WorkflowService(
         controllerConf,
         workflowContext,
         resultService,
+        cacheService,
         req,
         executionStateStore,
         errorHandler,
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index cc1d760437..f77fe2653c 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -51,9 +51,10 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
   - `fingerprint_json`: canonical JSON of the upstream sub‑DAG.
   - `subdag_hash`: SHA-256 of `fingerprint_json`.
   - `result_uri`: materialization URI.
-  - `tuple_count` (optional), `source_execution_id` (optional), timestamps.
+  - `tuple_count` (optional), `source_execution_id` (optional).
+  - `updated_at`: TIMESTAMPTZ managed by database (DEFAULT now()).
 - `global_port_id` uses existing `GlobalPortIdentity` serialization.
-- Status: schema + migration added (`sql/texera_ddl.sql`, 
`sql/updates/16.sql`).
+- Status: schema + migration added (`sql/updates/cache.sql`).
 
 ## Fingerprint
 - Utility: `FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
globalPortId) -> (fingerprintJson, subdagHash)`.
@@ -122,18 +123,24 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 7. **Set phase to Completed**: Region lifecycle completes immediately
 
 #### ToExecute Regions (Normal Execution)
-**Location**: `PortCompletedHandler` → `OperatorPortCacheService` → 
`OperatorPortCacheDao`
+**Location**: `PortCompletedHandler` → `PortMaterialized event` → 
`ExecutionCacheService` → `OperatorPortCacheService` → `OperatorPortCacheDao`
 
 1. **Execute operators**: Normal execution path via worker actors
 2. **On output port completion** (`PortCompletedHandler`):
-   - Call `OperatorPortCacheService.upsertCachedOutput(...)`:
-     - Compute fingerprint via 
`FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)`
-     - Retrieve tuple count (best-effort via 
`DocumentFactory.openDocument(uri).getCount`)
-     - Upsert to `operator_port_cache` table via DAO with:
+   - Retrieve result URI from 
`WorkflowExecutionsResource.getResultUriByPhysicalPortId`
+   - Retrieve tuple count (best-effort via 
`DocumentFactory.openDocument(uri).getCount`)
+   - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client 
via `sendToClient()`
+3. **Service layer** (`ExecutionCacheService`):
+   - Registered callback via `client.registerCallback[PortMaterialized]` 
receives event
+   - Calls `OperatorPortCacheService.upsertCachedOutput(...)`:
+     - Computes fingerprint via 
`FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)`
+     - Upserts to `operator_port_cache` table via DAO with:
        - `workflow_id`, `global_port_id`, `subdag_hash`
        - `fingerprint_json`, `result_uri`
        - `tuple_count`, `source_execution_id`, timestamps
-3. **Normal stats emission**: Real execution metrics sent to client
+4. **Normal stats emission**: Real execution metrics sent to client
+
+**Architecture note**: Event-based communication follows existing controller 
pattern - handler emits events via `sendToClient()`, service layer registers 
callbacks to handle them. Clean separation: engine layer knows nothing about 
web/service layer.
 
 ### 4. Client-Side State Management
 **Location**: `ExecutionStatsService`, `ExecutionStateStore`
@@ -218,260 +225,25 @@ HTTP endpoints for external access (if needed):
 
 **Note**: Internal services use `OperatorPortCacheService`, not the REST 
resource.
 
-### 6. Implementation Guide
+### 6. Implemented Components Reference
 
-#### Step 1: Create OperatorPortCacheDao
-**File**: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
+Phase 1.1 Service/DAO architecture is complete. Key components:
 
-```scala
-package org.apache.texera.web.dao
-
-import org.apache.texera.amber.core.virtualidentity.ExecutionIdentity
-import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE
-import org.jooq.DSLContext
-
-import java.net.URI
-import scala.jdk.OptionConverters._
-
-case class OperatorPortCacheRecord(
-    workflowId: Long,
-    globalPortId: String,
-    subdagHash: String,
-    fingerprintJson: String,
-    resultUri: URI,
-    tupleCount: Option[Long],
-    sourceExecutionId: Option[Long],
-    createdAt: Long,
-    updatedAt: Long
-)
+| Component | Location | Purpose |
+|-----------|----------|---------|
+| **OperatorPortCacheDao** | 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | 
Low-level database access using Jooq. Methods: `get()`, `upsert()`, 
`deleteByWorkflow()` |
+| **OperatorPortCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
 | High-level cache operations. Methods: `lookupCachedOutputs()`, 
`upsertCachedOutput()`, `invalidateWorkflowCache()` |
+| **ExecutionCacheService** | 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
 | Event listener that registers callback for `PortMaterialized` events and 
bridges to service layer |
+| **PortMaterialized Event** | 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
 | Client event emitted when output port completes with URI and tuple count |
 
-class OperatorPortCacheDao(sqlServer: SqlServer) {
-  private val context: DSLContext = sqlServer.createDSLContext()
+**Integration Points**:
+- **WorkflowService**: Instantiates `cacheService` at workflow level (shared 
across executions)
+- **WorkflowExecutionService**:
+  - Uses `cacheService.lookupCachedOutputs()` at submission time
+  - Instantiates `executionCacheService` per execution for cache writes
+- **PortCompletedHandler**: Emits `PortMaterialized` event via 
`sendToClient()` when output ports complete
 
-  def get(
-      workflowId: Long,
-      serializedPortId: String,
-      subdagHash: String
-  ): Option[OperatorPortCacheRecord] = {
-    context
-      .select(
-        OPERATOR_PORT_CACHE.WORKFLOW_ID,
-        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
-        OPERATOR_PORT_CACHE.SUBDAG_HASH,
-        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
-        OPERATOR_PORT_CACHE.RESULT_URI,
-        OPERATOR_PORT_CACHE.TUPLE_COUNT,
-        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID,
-        OPERATOR_PORT_CACHE.CREATED_AT,
-        OPERATOR_PORT_CACHE.UPDATED_AT
-      )
-      .from(OPERATOR_PORT_CACHE)
-      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
-      .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId))
-      .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash))
-      .fetchOptional()
-      .toScala
-      .map { record =>
-        OperatorPortCacheRecord(
-          workflowId = record.value1().longValue(),
-          globalPortId = record.value2(),
-          subdagHash = record.value3(),
-          fingerprintJson = record.value4(),
-          resultUri = URI.create(record.value5()),
-          tupleCount = Option(record.value6()).map(_.longValue()),
-          sourceExecutionId = Option(record.value7()).map(_.longValue()),
-          createdAt = record.value8().longValue(),
-          updatedAt = record.value9().longValue()
-        )
-      }
-  }
-
-  def upsert(record: OperatorPortCacheRecord): Unit = {
-    val dbRecord = context.newRecord(OPERATOR_PORT_CACHE)
-    dbRecord.setWorkflowId(record.workflowId.toInt)
-    dbRecord.setGlobalPortId(record.globalPortId)
-    dbRecord.setSubdagHash(record.subdagHash)
-    dbRecord.setFingerprintJson(record.fingerprintJson)
-    dbRecord.setResultUri(record.resultUri.toString)
-    record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c)))
-    record.sourceExecutionId.foreach(eid => 
dbRecord.setSourceExecutionId(Long.box(eid)))
-
-    context
-      .insertInto(OPERATOR_PORT_CACHE)
-      .set(dbRecord)
-      .onConflict(
-        OPERATOR_PORT_CACHE.WORKFLOW_ID,
-        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
-        OPERATOR_PORT_CACHE.SUBDAG_HASH
-      )
-      .doUpdate()
-      .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri)
-      .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson)
-      .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount)
-      .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, 
dbRecord.getSourceExecutionId)
-      .execute()
-  }
-
-  def deleteByWorkflow(workflowId: Long): Unit = {
-    context
-      .deleteFrom(OPERATOR_PORT_CACHE)
-      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt))
-      .execute()
-  }
-}
-```
-
-#### Step 2: Create OperatorPortCacheService
-**File**: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
-
-```scala
-package org.apache.texera.web.service
-
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
-import org.apache.texera.amber.core.workflow.cache.FingerprintUtil
-import org.apache.texera.amber.core.workflow.{CachedOutput, 
GlobalPortIdentity, PhysicalPlan}
-import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
-import org.apache.texera.web.dao.{OperatorPortCacheDao, 
OperatorPortCacheRecord}
-
-import java.net.URI
-
-class OperatorPortCacheService(dao: OperatorPortCacheDao) {
-
-  /**
-    * Lookup cached outputs for all materializable ports in the physical plan.
-    * Called at workflow submission time.
-    */
-  def lookupCachedOutputs(
-      workflowId: WorkflowIdentity,
-      physicalPlan: PhysicalPlan
-  ): Map[GlobalPortIdentity, CachedOutput] = {
-    physicalPlan.operators
-      .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, 
pid)))
-      .flatMap { gpid =>
-        val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
-        dao.get(workflowId.id, gpid.serializeAsString, 
fingerprint.subdagHash).map { record =>
-          gpid -> CachedOutput(
-            resultUri = record.resultUri,
-            fingerprintJson = record.fingerprintJson,
-            tupleCount = record.tupleCount,
-            sourceExecutionId = 
record.sourceExecutionId.map(ExecutionIdentity(_))
-          )
-        }
-      }
-      .toMap
-  }
-
-  /**
-    * Upsert cache entry when an output port completes.
-    * Called by PortCompletedHandler at runtime.
-    */
-  def upsertCachedOutput(
-      workflowId: WorkflowIdentity,
-      executionId: ExecutionIdentity,
-      portId: GlobalPortIdentity,
-      physicalPlan: PhysicalPlan,
-      resultUri: URI,
-      tupleCount: Option[Long]
-  ): Unit = {
-    val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, 
portId)
-    val now = System.currentTimeMillis()
-
-    dao.upsert(
-      OperatorPortCacheRecord(
-        workflowId = workflowId.id,
-        globalPortId = portId.serializeAsString,
-        subdagHash = fingerprint.subdagHash,
-        fingerprintJson = fingerprint.fingerprintJson,
-        resultUri = resultUri,
-        tupleCount = tupleCount,
-        sourceExecutionId = Some(executionId.id),
-        createdAt = now,
-        updatedAt = now
-      )
-    )
-  }
-
-  /**
-    * Invalidate all cache entries for a workflow.
-    * Useful for manual cache clearing or workflow deletion.
-    */
-  def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = {
-    dao.deleteByWorkflow(workflowId.id)
-  }
-
-  /**
-    * Future: Cost-aware eviction when storage quota is exceeded.
-    */
-  def evictLowValueEntries(quotaBytes: Long): Unit = {
-    // Phase 3: Lifecycle management
-    throw new UnsupportedOperationException("Not yet implemented")
-  }
-}
-```
-
-#### Step 3: Update WorkflowExecutionService
-**File**: 
`/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
-
-```scala
-class WorkflowExecutionService(
-    controllerConfig: ControllerConfig,
-    val workflowContext: WorkflowContext,
-    resultService: ExecutionResultService,
-    cacheService: OperatorPortCacheService,  // INJECT HERE
-    request: WorkflowExecuteRequest,
-    val executionStateStore: ExecutionStateStore,
-    errorHandler: Throwable => Unit,
-    userEmailOpt: Option[String],
-    sessionUri: URI
-) extends SubscriptionManager with LazyLogging {
-
-  def executeWorkflow(): Unit = {
-    try {
-      workflow = new 
WorkflowCompiler(workflowContext).compile(request.logicalPlan)
-
-      // Use cache service for lookup
-      val cachedOutputs = cacheService
-        .lookupCachedOutputs(workflowContext.workflowId, workflow.physicalPlan)
-        .map { case (gpid, cached) => gpid.serializeAsString -> cached }
-
-      workflowContext.workflowSettings =
-        workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs)
-    } catch {
-      case err: Throwable => errorHandler(err)
-    }
-    // ... rest of method
-  }
-}
-```
-
-#### Step 4: Update PortCompletedHandler
-**File**: 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala`
-
-```scala
-// Inject OperatorPortCacheService into controller/handler
-
-(storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match {
-  case (Some(uri), Some(plan)) =>
-    val tupleCount = try {
-      Some(DocumentFactory.openDocument(uri)._1.getCount)
-    } catch {
-      case _: Throwable => None
-    }
-
-    // Use cache service for upsert
-    cacheService.upsertCachedOutput(
-      cp.workflowContext.workflowId,
-      cp.workflowContext.executionId,
-      globalPortId,
-      plan,
-      uri,
-      tupleCount
-    )
-  case _ => // no-op
-}
-```
+**Architecture**: Event-based communication follows existing patterns 
(ExecutionStatsUpdate, WorkerAssignmentUpdate). Engine layer has zero knowledge 
of web/service layer.
 
 ### 7. Testing Strategy
 - **Unit tests**: Fingerprint determinism, cost model logic, region 
classification
@@ -483,33 +255,37 @@ class WorkflowExecutionService(
 
 ### Architecture Layers
 
-**Current (Prototype)**:
+**Clean Architecture (Implemented)**:
 ```
-WorkflowExecutionService ───┐
-                            ├──→ WorkflowExecutionsResource (Jooq)
-PortCompletedHandler ───────┘
+WorkflowExecutionService ──→ lookupCachedOutputs()
+                            ↓
+ExecutionCacheService ────→ upsertCachedOutput()     OperatorPortCacheService
+    ↑                           ↓                             ↓
+    └─ registerCallback() ──────┘                    OperatorPortCacheDao 
(Jooq)
+         (PortMaterialized event)                             ↓
+                                                        operator_port_cache 
table
 ```
 
-**Proposed (Clean Architecture)**:
-```
-WorkflowExecutionService ───┐
-                            ├──→ OperatorPortCacheService ──→ 
OperatorPortCacheDao (Jooq)
-PortCompletedHandler ───────┤
-                            │
-WorkflowExecutionsResource ─┘ (optional REST endpoints)
-```
+**Event-based communication flow**:
+1. `PortCompletedHandler` emits `PortMaterialized` event via `sendToClient()`
+2. `ExecutionCacheService` registers callback via 
`client.registerCallback[PortMaterialized]`
+3. Callback invokes `OperatorPortCacheService.upsertCachedOutput()`
+4. Service calls `OperatorPortCacheDao.upsert()` for database persistence
 
-**Refactoring needed**:
-1. Extract Jooq code from `WorkflowExecutionsResource` → `OperatorPortCacheDao`
-2. Create `OperatorPortCacheService` with workflow-level abstractions
-3. Update service call sites to use `OperatorPortCacheService`
+**Clean layering**: Engine layer (PortCompletedHandler) has zero knowledge of 
web/service layer. Event-based pattern matches existing controller 
communication (ExecutionStatsUpdate, WorkerAssignmentUpdate, etc.).
 
 ### Completed Components
-- **Schema/migration**: `operator_port_cache` table added 
(`sql/texera_ddl.sql`, `sql/updates/16.sql`)
+- **Schema/migration**: `operator_port_cache` table added 
(`sql/updates/cache.sql`)
+  - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), 
`fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, 
`updated_at`
+  - Timestamp managed by database (`DEFAULT now()`)
+- **Service/DAO architecture** (Phase 1.1 - Complete):
+  - `OperatorPortCacheDao`: Low-level database access with get/upsert/delete 
methods
+  - `OperatorPortCacheService`: High-level cache operations 
(lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache)
+  - `ExecutionCacheService`: Event listener bridging controller events to 
service layer
+  - Event-based communication via `PortMaterialized` event and 
`client.registerCallback[T]`
 - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs 
for deterministic subDAG hashing
-- **Submission-time lookup**: `WorkflowExecutionService` computes fingerprints 
for all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
-- **Cache persistence**: `PortCompletedHandler` upserts to 
`operator_port_cache` on output port completion (includes fingerprint, URI, 
tuple count)
-- **API layer**: `WorkflowExecutionsResource` exposes cache lookup/upsert 
helpers
+- **Submission-time lookup**: `WorkflowExecutionService` uses 
`OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for 
all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
+- **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event 
→ `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → 
`OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count)
 - **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached 
when all required outputs have hits, reuses cached URIs in port configs
 - **Runtime execution**: `RegionExecutionCoordinator` branches on 
`region.cached` flag:
   - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, 
emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs 
downstream
@@ -586,18 +362,31 @@ The cache system integrates with three layers:
 
 ### Phase 1: Complete Prototype (Engineering)
 
-#### 1.1 Refactor to Service/DAO Architecture
-- [ ] Create `OperatorPortCacheDao` with get/upsert/delete methods
-  - Extract Jooq code from `WorkflowExecutionsResource`
-  - Define `OperatorPortCacheRecord` case class
-  - Add unit tests for DAO operations
-- [ ] Create `OperatorPortCacheService` with high-level methods
+#### 1.1 Refactor to Service/DAO Architecture ✓ COMPLETE
+- [x] Create `OperatorPortCacheDao` with get/upsert/delete methods
+  - Extracted Jooq code into dedicated DAO layer
+  - Defined `OperatorPortCacheRecord` case class matching database schema
+  - Location: 
`/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala`
+- [x] Create `OperatorPortCacheService` with high-level methods
   - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission
   - `upsertCachedOutput(...)`: cache write on port completion
   - `invalidateWorkflowCache(workflowId)`: manual invalidation
-  - Encapsulate fingerprint computation and serialization
-- [ ] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service
-- [ ] Refactor `PortCompletedHandler` to use service
+  - Encapsulates fingerprint computation and serialization
+  - Location: 
`/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala`
+- [x] Create `ExecutionCacheService` for event handling
+  - Registers callback for `PortMaterialized` events
+  - Bridges controller events to service layer
+  - Location: 
`/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala`
+- [x] Add `PortMaterialized` event type
+  - Location: 
`/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala`
+- [x] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service
+  - Uses `OperatorPortCacheService.lookupCachedOutputs()`
+- [x] Refactor `PortCompletedHandler` to emit events
+  - Emits `PortMaterialized` event via `sendToClient()` instead of direct 
service calls
+- [x] Instantiate services in `WorkflowService` and `WorkflowExecutionService`
+  - `cacheService` created at workflow level
+  - `executionCacheService` created per execution
+- [ ] Add unit tests for DAO operations
 - [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that 
delegate to service
 
 #### 1.2 Testing & Validation


Reply via email to