This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit ade48b9f5408dd28fe07d834da6a3bb795f5b545 Author: Xiaozhen Liu <[email protected]> AuthorDate: Wed Jan 14 14:29:11 2026 -0800 feat(cache): use OperatorExecution's stats for cache upsert. --- .../controller/promisehandlers/PortCompletedHandler.scala | 14 +++++++------- docs/operator-port-cache.md | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index 254948a0e1..e9439eeba1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import org.apache.texera.amber.core.WorkflowRuntimeException -import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.workflow.GlobalPortIdentity import org.apache.texera.amber.engine.architecture.controller.{ ControllerAsyncRPCHandlerInitializer, @@ -85,12 +84,13 @@ trait PortCompletedHandler { globalPortId ) storageUriOpt.foreach { uri => - val tupleCount = - try { - Some(DocumentFactory.openDocument(uri)._1.getCount) - } catch { - case _: Throwable => None - } + // Prefer runtime statistics over storage reads for tuple counts. + val tupleCount = operatorExecution + .getStats + .operatorStatistics + .outputMetrics + .find(_.portId == msg.portId) + .map(_.tupleMetrics.count) sendToClient(PortMaterialized(globalPortId, uri, tupleCount)) } } diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index 8fab7a1a59..de3b0471bd 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -131,7 +131,7 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached 1. **Execute operators**: Normal execution path via worker actors 2. **On output port completion** (`PortCompletedHandler`): - Retrieve result URI from `WorkflowExecutionsResource.getResultUriByPhysicalPortId` - - Retrieve tuple count (best-effort via `DocumentFactory.openDocument(uri).getCount`) + - Retrieve tuple count (best-effort via runtime stats from worker metrics) - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client via `sendToClient()` 3. **Service layer** (`ExecutionCacheService`): - Registered callback via `client.registerCallback[PortMaterialized]` receives event @@ -216,7 +216,7 @@ class OperatorPortCacheService(dao: OperatorPortCacheDao) { **Key responsibilities**: - Encapsulates fingerprint computation (calls `FingerprintUtil`) - Handles `GlobalPortIdentity` ↔ String serialization -- Manages tuple count retrieval (best-effort via `DocumentFactory`) +- Manages tuple count propagation (best-effort via runtime stats) - Provides workflow-level abstractions (batch lookup, invalidation) #### WorkflowExecutionsResource (REST API - Optional)
