This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit ade48b9f5408dd28fe07d834da6a3bb795f5b545
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Jan 14 14:29:11 2026 -0800

    feat(cache): use OperatorExecution's stats for cache upsert.
---
 .../controller/promisehandlers/PortCompletedHandler.scala  | 14 +++++++-------
 docs/operator-port-cache.md                                |  4 ++--
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index 254948a0e1..e9439eeba1 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -21,7 +21,6 @@ package 
org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
 import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.workflow.GlobalPortIdentity
 import org.apache.texera.amber.engine.architecture.controller.{
   ControllerAsyncRPCHandlerInitializer,
@@ -85,12 +84,13 @@ trait PortCompletedHandler {
                     globalPortId
                   )
                 storageUriOpt.foreach { uri =>
-                  val tupleCount =
-                    try {
-                      Some(DocumentFactory.openDocument(uri)._1.getCount)
-                    } catch {
-                      case _: Throwable => None
-                    }
+                  // Prefer runtime statistics over storage reads for tuple 
counts.
+                  val tupleCount = operatorExecution
+                    .getStats
+                    .operatorStatistics
+                    .outputMetrics
+                    .find(_.portId == msg.portId)
+                    .map(_.tupleMetrics.count)
                   sendToClient(PortMaterialized(globalPortId, uri, tupleCount))
                 }
               }
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index 8fab7a1a59..de3b0471bd 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -131,7 +131,7 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 1. **Execute operators**: Normal execution path via worker actors
 2. **On output port completion** (`PortCompletedHandler`):
    - Retrieve result URI from 
`WorkflowExecutionsResource.getResultUriByPhysicalPortId`
-   - Retrieve tuple count (best-effort via 
`DocumentFactory.openDocument(uri).getCount`)
+  - Retrieve tuple count (best-effort via runtime stats from worker metrics)
    - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client 
via `sendToClient()`
 3. **Service layer** (`ExecutionCacheService`):
    - Registered callback via `client.registerCallback[PortMaterialized]` 
receives event
@@ -216,7 +216,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) {
 **Key responsibilities**:
 - Encapsulates fingerprint computation (calls `FingerprintUtil`)
 - Handles `GlobalPortIdentity` ↔ String serialization
-- Manages tuple count retrieval (best-effort via `DocumentFactory`)
+- Manages tuple count propagation (best-effort via runtime stats)
 - Provides workflow-level abstractions (batch lookup, invalidation)
 
 #### WorkflowExecutionsResource (REST API - Optional)

Reply via email to