This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 62fcb3807644d7446224d1ec80726463044ac798
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 12 20:27:03 2026 -0800

    feat(cache): update cached region completion logic to avoid assigning 
workers.
---
 .../controller/execution/OperatorExecution.scala   | 108 ++++++++++++++-------
 .../scheduling/RegionExecutionCoordinator.scala    |  53 +++++-----
 docs/operator-port-cache.md                        |   9 +-
 3 files changed, 106 insertions(+), 64 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala
index 5a7f57083a..6e1f27274b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecution.scala
@@ -40,6 +40,22 @@ case class OperatorExecution() {
 
   private val workerExecutions =
     new util.concurrent.ConcurrentHashMap[ActorVirtualIdentity, 
WorkerExecution]()
+  // Cached metrics for ToSkip regions; when set, operator stats/state are 
derived from this only.
+  private var cachedMetrics: Option[OperatorMetrics] = None
+
+  /**
+    * Sets cached operator metrics for a ToSkip region and bypasses 
worker-based aggregation.
+    */
+  def setCachedMetrics(metrics: OperatorMetrics): Unit = {
+    cachedMetrics = Some(metrics)
+  }
+
+  /**
+    * Clears cached operator metrics so the operator can report live worker 
stats again.
+    */
+  def clearCachedMetrics(): Unit = {
+    cachedMetrics = None
+  }
 
   /**
     * Initializes a `WorkerExecution` for the specified workerId and adds it 
to the workerExecutions map.
@@ -70,17 +86,24 @@ case class OperatorExecution() {
     */
   def getWorkerIds: Set[ActorVirtualIdentity] = 
workerExecutions.keys.asScala.toSet
 
+  /**
+    * Returns the aggregated operator state from worker executions, or the 
cached state when present.
+    */
   def getState: WorkflowAggregatedState = {
-    val workerStates = workerExecutions.values.asScala.map(_.getState)
-    aggregateStates(
-      workerStates,
-      WorkerState.COMPLETED,
-      WorkerState.TERMINATED,
-      WorkerState.RUNNING,
-      WorkerState.UNINITIALIZED,
-      WorkerState.PAUSED,
-      WorkerState.READY
-    )
+    cachedMetrics
+      .map(_.operatorState)
+      .getOrElse {
+        val workerStates = workerExecutions.values.asScala.map(_.getState)
+        aggregateStates(
+          workerStates,
+          WorkerState.COMPLETED,
+          WorkerState.TERMINATED,
+          WorkerState.RUNNING,
+          WorkerState.UNINITIALIZED,
+          WorkerState.PAUSED,
+          WorkerState.READY
+        )
+      }
   }
 
   private[this] def computeOperatorPortStats(
@@ -89,36 +112,55 @@ case class OperatorExecution() {
     ExecutionUtils.aggregatePortMetrics(workerPortStats)
   }
 
+  /**
+    * Returns operator metrics aggregated from worker executions, or cached 
metrics when set.
+    */
   def getStats: OperatorMetrics = {
-    val workerRawStats = workerExecutions.values.asScala.map(_.getStats)
-    val inputMetrics = workerRawStats.flatMap(_.inputTupleMetrics)
-    val outputMetrics = workerRawStats.flatMap(_.outputTupleMetrics)
-    OperatorMetrics(
-      getState,
-      OperatorStatistics(
-        inputMetrics = computeOperatorPortStats(inputMetrics),
-        outputMetrics = computeOperatorPortStats(outputMetrics),
-        getWorkerIds.size,
-        dataProcessingTime = workerRawStats.map(_.dataProcessingTime).sum,
-        controlProcessingTime = 
workerRawStats.map(_.controlProcessingTime).sum,
-        idleTime = workerRawStats.map(_.idleTime).sum
+    cachedMetrics.getOrElse {
+      val workerRawStats = workerExecutions.values.asScala.map(_.getStats)
+      val inputMetrics = workerRawStats.flatMap(_.inputTupleMetrics)
+      val outputMetrics = workerRawStats.flatMap(_.outputTupleMetrics)
+      OperatorMetrics(
+        getState,
+        OperatorStatistics(
+          inputMetrics = computeOperatorPortStats(inputMetrics),
+          outputMetrics = computeOperatorPortStats(outputMetrics),
+          getWorkerIds.size,
+          dataProcessingTime = workerRawStats.map(_.dataProcessingTime).sum,
+          controlProcessingTime = 
workerRawStats.map(_.controlProcessingTime).sum,
+          idleTime = workerRawStats.map(_.idleTime).sum
+        )
       )
-    )
+    }
   }
 
+  /**
+    * Returns true when all worker input ports are completed, or always true 
for cached operators.
+    */
   def isInputPortCompleted(portId: PortIdentity): Boolean = {
-    workerExecutions
-      .values()
-      .asScala
-      .map(workerExecution => workerExecution.getInputPortExecution(portId))
-      .forall(_.completed)
+    if (cachedMetrics.isDefined) {
+      true
+    } else {
+      workerExecutions
+        .values()
+        .asScala
+        .map(workerExecution => workerExecution.getInputPortExecution(portId))
+        .forall(_.completed)
+    }
   }
 
+  /**
+    * Returns true when all worker output ports are completed, or always true 
for cached operators.
+    */
   def isOutputPortCompleted(portId: PortIdentity): Boolean = {
-    workerExecutions
-      .values()
-      .asScala
-      .map(workerExecution => workerExecution.getOutputPortExecution(portId))
-      .forall(_.completed)
+    if (cachedMetrics.isDefined) {
+      true
+    } else {
+      workerExecutions
+        .values()
+        .asScala
+        .map(workerExecution => workerExecution.getOutputPortExecution(portId))
+        .forall(_.completed)
+    }
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index a1bd6bdd17..2b6c9e6d71 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -29,12 +29,13 @@ import 
org.apache.texera.amber.engine.architecture.common.{AkkaActorRefMappingSe
 import 
org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution,
 RegionExecution, WorkflowExecution}
 import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
-import 
org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, 
OperatorConfig, OutputPortConfig, ResourceConfig, WorkerConfig}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, 
WorkflowAggregatedState}
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, 
OperatorConfig, OutputPortConfig, ResourceConfig}
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning
-import 
org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping,
 TupleMetrics, WorkerState, WorkerStatistics}
+import 
org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping,
 TupleMetrics, WorkerState}
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.engine.common.FutureBijection._
+import 
org.apache.texera.amber.engine.common.executionruntimestate.{OperatorMetrics, 
OperatorStatistics}
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.web.SessionState
@@ -97,38 +98,34 @@ class RegionExecutionCoordinator(
     initRegionExecution()
   }
 
+  /**
+    * Short-circuit a cached region by recording operator metrics and output 
URIs without workers,
+    * then emit stats and mark the region as completed.
+    */
   private def completeCachedRegion(): Unit = {
     val regionExecution = workflowExecution.getRegionExecution(region.id)
     val resourceConfig = region.resourceConfig.getOrElse(ResourceConfig())
     region.getOperators.foreach { op =>
       val opExecution = regionExecution.initOperatorExecution(op.id)
-      val workerConfigs = resourceConfig.operatorConfigs
-        .get(op.id)
-        .map(_.workerConfigs)
-        .getOrElse(WorkerConfig.generateWorkerConfigs(op))
-      workerConfigs.foreach { workerCfg =>
-        val workerExecution = 
opExecution.initWorkerExecution(workerCfg.workerId)
-        op.inputPorts.keys.foreach(pid => 
workerExecution.getInputPortExecution(pid).setCompleted())
-        op.outputPorts.keys.foreach(pid => 
workerExecution.getOutputPortExecution(pid).setCompleted())
-        val outputMetrics = op.outputPorts.keys.map { pid =>
-          val count = resourceConfig.portConfigs.collectFirst {
-            case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
-              cfg.cachedTupleCount.getOrElse(0L)
-          }.getOrElse(0L)
-          PortTupleMetricsMapping(pid, TupleMetrics(count, 0L))
-        }.toSeq
-        val inputMetrics = op.inputPorts.keys
-          .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
-          .toSeq
-        val stats = WorkerStatistics(
+      // Cached regions do not create workers; synthesize operator-level 
metrics instead.
+      val outputMetrics = op.outputPorts.keys.map { pid =>
+        val count = resourceConfig.portConfigs.collectFirst {
+          case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
+            cfg.cachedTupleCount.getOrElse(0L)
+        }.getOrElse(0L)
+        PortTupleMetricsMapping(pid, TupleMetrics(count, 0L))
+      }.toSeq
+      val inputMetrics = op.inputPorts.keys
+        .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
+        .toSeq
+      val stats = OperatorMetrics(
+        WorkflowAggregatedState.COMPLETED,
+        OperatorStatistics(
           inputMetrics,
           outputMetrics,
-          dataProcessingTime = 0L,
-          controlProcessingTime = 0L,
-          idleTime = 0L
         )
-        workerExecution.update(System.nanoTime(), WorkerState.COMPLETED, stats)
-      }
+      )
+      opExecution.setCachedMetrics(stats)
     }
     recordCachedOutputPortResults(resourceConfig)
     asyncRPCClient.sendToClient(
@@ -362,6 +359,8 @@ class RegionExecutionCoordinator(
         else
           None
       )
+      // Ensure live execution does not inherit cached operator metrics.
+      operatorExecution.clearCachedMetrics()
 
       if (!existOpExecution) {
         buildOperator(
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index f77fe2653c..940d0490fa 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -35,10 +35,10 @@ A region is either fully cached (ToSkip) or fully executed 
(ToExecute) — no pa
 - Cost model (compare full region costs)
 
 ### 4. Shallow State Hierarchy for Cached Regions
-ToSkip regions create lightweight state structures (Workflow → Region → 
Operator/Link) without Worker/Channel states. This is consistent with 
`numWorkers=0` and avoids synthetic worker lifecycle management.
+ToSkip regions create lightweight state structures (Workflow → Region → 
Operator/Link) and store cached metrics at the operator level. No 
Worker/Channel states are created, so `numWorkers=0` and no worker assignments 
are emitted.
 
 ### 5. Stats Emission via Direct Client Updates
-Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`. This reuses existing stats infrastructure 
without special-casing the frontend.
+Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). 
This reuses existing stats infrastructure without special-casing the frontend.
 
 ### 6. No Explicit Cache Flag in Metrics
 Cached execution is inferred from `numWorkers=0` + instant completion, rather 
than adding a `from_cache` flag to protobuf messages. This minimizes protocol 
changes.
@@ -112,8 +112,9 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 1. **Skip operator execution**: Call `completeCachedRegion()` immediately
 2. **State hierarchy** (shallow):
    - Create: Workflow → Region → Operator/Link states
+   - Record cached operator metrics (numWorkers=0)
    - Skip: Worker/Channel states (not needed)
-3. **Mark ports completed**: Set port status to COMPLETED with cached URI
+3. **Mark ports completed**: Treat cached operators as completed and record 
cached URIs
 4. **Emit synthetic stats** via 
`asyncRPCClient.sendToClient(ExecutionStatsUpdate(...))`:
    - `numWorkers = 0`
    - `dataProcessingTime = 0`, `controlProcessingTime = 0`, `idleTime = 0`
@@ -288,7 +289,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event 
→ `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → 
`OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count)
 - **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached 
when all required outputs have hits, reuses cached URIs in port configs
 - **Runtime execution**: `RegionExecutionCoordinator` branches on 
`region.cached` flag:
-  - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, 
emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs 
downstream
+  - ToSkip regions: `completeCachedRegion()` records cached operator metrics 
(numWorkers=0, processingTime=0) without creating workers, propagates cached 
URIs downstream
   - ToExecute regions: normal execution path
 - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct 
client updates, maintaining consistency with normal execution lifecycle
 

Reply via email to