This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 707dd632615b9a4a314970a088e68c43faa88c8b
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Feb 9 15:05:33 2026 -0800

    feat(cache): introduce forced cache reuse assuming reusing cache is always 
better.
---
 .../controller/execution/WorkflowExecution.scala   |  18 +-
 .../promisehandlers/PortCompletedHandler.scala     |  36 +-
 .../QueryWorkerStatisticsHandler.scala             |  30 +-
 .../scheduling/CostBasedScheduleGenerator.scala    | 561 ++++++++++++++-------
 .../scheduling/RegionExecutionCoordinator.scala    |  33 +-
 .../scheduling/config/PortConfig.scala             |  14 +-
 .../texera/web/dao/OperatorPortCacheDao.scala      |   3 +-
 docs/operator-port-cache.md                        | 107 ++--
 8 files changed, 550 insertions(+), 252 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b806479b89 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -119,10 +119,22 @@ case class WorkflowExecution() {
     * @throws NoSuchElementException if no `OperatorExecution` is found for 
the specified operatorId.
     */
   def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity): 
OperatorExecution = {
-    regionExecutions.values.toList
+    getLatestOperatorExecutionOption(physicalOpId).get
+  }
+
+  /**
+    * Returns the latest `OperatorExecution` for a physical operator if it has 
been initialized.
+    *
+    * This is the safe counterpart of `getLatestOperatorExecution` for callers 
that may traverse
+    * operators before their region is launched (e.g., full-graph stats 
queries while execution is still
+    * progressing through schedule levels).
+    */
+  def getLatestOperatorExecutionOption(
+      physicalOpId: PhysicalOpIdentity
+  ): Option[OperatorExecution] = {
+    regionExecutions.values.toSeq
       .findLast(regionExecution => 
regionExecution.hasOperatorExecution(physicalOpId))
-      .get
-      .getOperatorExecution(physicalOpId)
+      .map(_.getOperatorExecution(physicalOpId))
   }
 
   def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index e9439eeba1..abb8b1357a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -33,6 +33,7 @@ import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   QueryStatisticsRequest
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.OutputPortConfig
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.util.VirtualIdentityUtils
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
@@ -78,20 +79,27 @@ trait PortCompletedHandler {
             if (isPortCompleted) {
               // If this is an output port and materialized, notify client 
(for cache upsert).
               if (!msg.input) {
-                val storageUriOpt =
-                  WorkflowExecutionsResource.getResultUriByPhysicalPortId(
-                    cp.workflowContext.executionId,
-                    globalPortId
-                  )
-                storageUriOpt.foreach { uri =>
-                  // Prefer runtime statistics over storage reads for tuple 
counts.
-                  val tupleCount = operatorExecution
-                    .getStats
-                    .operatorStatistics
-                    .outputMetrics
-                    .find(_.portId == msg.portId)
-                    .map(_.tupleMetrics.count)
-                  sendToClient(PortMaterialized(globalPortId, uri, tupleCount))
+                val isMaterializedOutput = region.resourceConfig
+                  .flatMap(_.portConfigs.get(globalPortId))
+                  .collect { case cfg: OutputPortConfig => cfg.materialize }
+                  .getOrElse(false)
+
+                if (isMaterializedOutput) {
+                  val storageUriOpt =
+                    WorkflowExecutionsResource.getResultUriByPhysicalPortId(
+                      cp.workflowContext.executionId,
+                      globalPortId
+                    )
+                  storageUriOpt.foreach { uri =>
+                    // Prefer runtime statistics over storage reads for tuple 
counts.
+                    val tupleCount = operatorExecution
+                      .getStats
+                      .operatorStatistics
+                      .outputMetrics
+                      .find(_.portId == msg.portId)
+                      .map(_.tupleMetrics.count)
+                    sendToClient(PortMaterialized(globalPortId, uri, 
tupleCount))
+                  }
                 }
               }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index fb725f850e..9f21d10383 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -105,20 +105,24 @@ trait QueryWorkerStatisticsHandler {
             if (opFilter.nonEmpty && !opFilter.contains(opId)) {
               Seq.empty
             } else {
-              val exec = cp.workflowExecution.getLatestOperatorExecution(opId)
-              // Skip completed operators
-              if (exec.getState == COMPLETED) {
-                Seq.empty
-              } else {
-                // Select all workers for this operator
-                val workerIds = exec.getWorkerIds
-
-                // Send queryStatistics to each worker and update internal 
state on reply
-                workerIds.map { wid =>
-                  workerInterface.queryStatistics(EmptyRequest(), wid).map { 
resp =>
-                    collectedResults.addOne((exec.getWorkerExecution(wid), 
resp, System.nanoTime()))
+              cp.workflowExecution.getLatestOperatorExecutionOption(opId) 
match {
+                // Operator region has not been initialized yet; skip in this 
polling round.
+                case None => Seq.empty
+                case Some(exec) =>
+                  // Skip completed operators
+                  if (exec.getState == COMPLETED) {
+                    Seq.empty
+                  } else {
+                    // Select all workers for this operator
+                    val workerIds = exec.getWorkerIds
+
+                    // Send queryStatistics to each worker and update internal 
state on reply
+                    workerIds.map { wid =>
+                      workerInterface.queryStatistics(EmptyRequest(), wid).map 
{ resp =>
+                        collectedResults.addOne((exec.getWorkerExecution(wid), 
resp, System.nanoTime()))
+                      }
+                    }
                   }
-                }
               }
             }
           }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index c603c8ef34..fc4a54c0f9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -30,7 +30,6 @@ import 
org.apache.texera.amber.engine.architecture.scheduling.config.{
   ResourceConfig
 }
 import org.apache.texera.amber.engine.common.AmberLogging
-import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde
 import org.jgrapht.Graph
 import org.jgrapht.alg.connectivity.BiconnectivityInspector
@@ -64,6 +63,19 @@ class CostBasedScheduleGenerator(
       numStatesExplored: Int = 0
   )
 
+  /**
+    * Deterministic cache-aware propagation result under Assumption III 
(forced cache use).
+    */
+  private case class RequirednessAnalysis(
+      requiredSeedPorts: Set[GlobalPortIdentity],
+      requiredOutputPorts: Set[GlobalPortIdentity],
+      executeOperators: Set[PhysicalOpIdentity],
+      freshRequiredOutputPorts: Set[GlobalPortIdentity],
+      cacheHitRequiredOutputPorts: Set[GlobalPortIdentity],
+      freshRequiredLinks: Set[PhysicalLink],
+      cacheFedLinks: Set[PhysicalLink]
+  )
+
   private val costEstimator =
     new DefaultCostEstimator(
       workflowContext = workflowContext,
@@ -77,6 +89,30 @@ class CostBasedScheduleGenerator(
         GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
     }
 
+  /**
+    * Search-time planning bindings applied during region construction.
+    *
+    * @param visibleFreshOutputPorts visible/sink output ports that must be 
freshly materialized.
+    * @param reuseOnlyOutputsByPort output ports that are required but must be 
served from cache (no rematerialization).
+    * @param cacheFedInputUrisByPort input ports pre-bound to cached upstream 
URIs.
+    */
+  private case class PlanningBindings(
+      visibleFreshOutputPorts: Set[GlobalPortIdentity],
+      reuseOnlyOutputsByPort: Map[GlobalPortIdentity, CachedOutput],
+      cacheFedInputUrisByPort: Map[GlobalPortIdentity, List[URI]]
+  )
+
+  private object PlanningBindings {
+    val empty: PlanningBindings = PlanningBindings(
+      visibleFreshOutputPorts = Set.empty,
+      reuseOnlyOutputsByPort = Map.empty,
+      cacheFedInputUrisByPort = Map.empty
+    )
+  }
+
+  private var activePlanningPlan: PhysicalPlan = initialPhysicalPlan
+  private var activePlanningBindings: PlanningBindings = PlanningBindings.empty
+
   private case class CostEstimatorMemoKey(
       physicalOpIds: Set[PhysicalOpIdentity],
       physicalLinkIds: Set[PhysicalLink],
@@ -84,173 +120,275 @@ class CostBasedScheduleGenerator(
       resourceConfig: Option[ResourceConfig]
   )
 
-  /**
-    * Classify regions as cached vs must-execute and rebuild port configs 
accordingly.
-    * A region is must-execute if it has a visible port without cache, or if 
it feeds a region
-    * that needs a materialization without cache (propagated upstream).
-    */
-  private def annotateRegionsWithCacheInfo(
-      regionDAG: DirectedAcyclicGraph[Region, RegionLink],
-      matEdges: Set[PhysicalLink],
-      opToRegionMap: Map[PhysicalOpIdentity, Region]
-  ): Unit = {
-    val regions = regionDAG.vertexSet().asScala.toSet
-
-    val needsByRegion = computePortsNeedingStorage(regions, matEdges, 
opToRegionMap)
-    val mustExecute = computeMustExecuteRegions(needsByRegion, matEdges, 
opToRegionMap)
-    val cachedRegions = regions.diff(mustExecute)
-    val outputConfigByPort = buildOutputPortConfigs(needsByRegion, 
cachedRegions)
-    val inputConfigByPort = buildInputPortConfigs(matEdges, outputConfigByPort)
-
-    // Rebuild each region with refreshed port configs and cached flag.
-    regions.foreach { region =>
-      val outputCfgs = needsByRegion(region).map(pid => pid -> 
outputConfigByPort(pid)).toMap
-      val inputCfgs = inputConfigByPort.filter { case (pid, _) => 
region.ports.contains(pid) }
-      val portConfigs = outputCfgs ++ inputCfgs
-      val newResourceConfig =
-        if (portConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
portConfigs))
-        else region.resourceConfig
+  private val costEstimatorMemoization
+      : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] =
+    new mutable.HashMap()
 
-      val newRegion = region.copy(
-        resourceConfig = newResourceConfig,
-        cached = cachedRegions.contains(region)
+  def generate(): (Schedule, PhysicalPlan) = {
+    val startTime = System.nanoTime()
+    val analysis = analyzeRequiredness(initialPhysicalPlan)
+
+    val executeRegionDAG = new DirectedAcyclicGraph[Region, 
RegionLink](classOf[RegionLink])
+    if (analysis.executeOperators.nonEmpty) {
+      val residualPlan = buildResidualPlan(
+        initialPhysicalPlan,
+        analysis.executeOperators,
+        analysis.freshRequiredLinks
       )
-      replaceVertex(regionDAG, region, newRegion)
+      val residualBindings = buildResidualPlanningBindings(residualPlan, 
analysis)
+      val searchResult = searchOnPlan(residualPlan, residualBindings)
+      searchResult.regionDAG.vertexSet().forEach(region => 
executeRegionDAG.addVertex(region))
+      searchResult.regionDAG
+        .edgeSet()
+        .forEach(edge =>
+          executeRegionDAG.addEdge(
+            searchResult.regionDAG.getEdgeSource(edge),
+            searchResult.regionDAG.getEdgeTarget(edge),
+            edge
+          )
+        )
     }
-  }
 
-  private def computePortsNeedingStorage(
-      regions: Set[Region],
-      matEdges: Set[PhysicalLink],
-      opToRegionMap: Map[PhysicalOpIdentity, Region]
-  ): Map[Region, Set[GlobalPortIdentity]] = {
-    // Ports that require materialization for this run: scheduler-imposed 
(matEdges) + UI-visible ports.
-    regions.map { region =>
-      val fromMatEdges = matEdges
-        .filter(e => opToRegionMap(e.fromOpId) == region)
-        .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId))
-      val visiblePorts = 
workflowContext.workflowSettings.outputPortsNeedingStorage
-        .filter(pid => region.physicalOps.exists(_.id == pid.opId))
-      region -> (fromMatEdges ++ visiblePorts)
-    }.toMap
+    val skipRegions = buildSkipRegions(initialPhysicalPlan, analysis)
+    val finalRegionPlan = buildFinalRegionPlan(executeRegionDAG, skipRegions)
+    val totalRPGTime = System.nanoTime() - startTime
+    val schedule = generateScheduleFromRegionPlan(finalRegionPlan)
+    logger.info(
+      s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, total RPG time: " +
+        s"${totalRPGTime / 1e6} ms."
+    )
+    (
+      schedule,
+      initialPhysicalPlan
+    )
   }
 
-  private def computeMustExecuteRegions(
-      needsByRegion: Map[Region, Set[GlobalPortIdentity]],
-      matEdges: Set[PhysicalLink],
-      opToRegionMap: Map[PhysicalOpIdentity, Region]
-  ): Set[Region] = {
-    // Seed with regions that have any needed port lacking cache; then 
propagate upstream along materialized edges.
-    val mustExecute = mutable.Set[Region]()
-    needsByRegion.foreach {
-      case (region, needs) =>
-        if (needs.exists(pid => !cachedOutputsByPort.contains(pid))) {
-          mustExecute += region
-        }
-    }
+  /**
+    * Computes deterministic requiredness and execute/skip decisions under 
Assumption III (forced cache use).
+    */
+  private def analyzeRequiredness(plan: PhysicalPlan): RequirednessAnalysis = {
+    val requiredSeeds =
+      workflowContext.workflowSettings.outputPortsNeedingStorage
+        .filter(pid => plan.operators.exists(_.id == pid.opId))
+
+    val requiredOutputPorts = mutable.Set[GlobalPortIdentity]()
+    requiredOutputPorts ++= requiredSeeds
+    val executeOperators = mutable.Set[PhysicalOpIdentity]()
+
     var changed = true
     while (changed) {
       changed = false
-      matEdges.foreach { e =>
-        val fromRegion = opToRegionMap(e.fromOpId)
-        val toRegion = opToRegionMap(e.toOpId)
-        val outPort = GlobalPortIdentity(e.fromOpId, e.fromPortId)
-        if (
-          mustExecute.contains(toRegion) &&
-          !cachedOutputsByPort.contains(outPort) &&
-          !mustExecute.contains(fromRegion)
-        ) {
-          mustExecute += fromRegion
-          changed = true
+      plan.topologicalIterator().foreach { opId =>
+        val op = plan.getOperator(opId)
+        val requiredOutputsOfOp =
+          op.outputPorts.keys
+            .map(portId => GlobalPortIdentity(op.id, portId))
+            .filter(requiredOutputPorts.contains)
+            .toSet
+        if (requiredOutputsOfOp.nonEmpty) {
+          val hasCacheMiss = requiredOutputsOfOp.exists(pid => 
!cachedOutputsByPort.contains(pid))
+          if (hasCacheMiss) {
+            if (!executeOperators.contains(opId)) {
+              executeOperators += opId
+              changed = true
+            }
+            plan.getUpstreamPhysicalLinks(opId).foreach { link =>
+              val upstreamOutput = GlobalPortIdentity(link.fromOpId, 
link.fromPortId)
+              if (!requiredOutputPorts.contains(upstreamOutput)) {
+                requiredOutputPorts += upstreamOutput
+                changed = true
+              }
+            }
+          }
         }
       }
     }
-    mustExecute.toSet
+
+    val freshRequiredOutputPorts =
+      requiredOutputPorts.filter(pid => 
!cachedOutputsByPort.contains(pid)).toSet
+    val cacheHitRequiredOutputPorts =
+      requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet
+    val linksIntoExecute = plan.links.filter(link => 
executeOperators.contains(link.toOpId))
+    val freshRequiredLinks = linksIntoExecute
+      .filter(link =>
+        freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
+      )
+      .toSet
+    val cacheFedLinks = linksIntoExecute
+      .filter(link =>
+        cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
+      )
+      .toSet
+
+    RequirednessAnalysis(
+      requiredSeedPorts = requiredSeeds,
+      requiredOutputPorts = requiredOutputPorts.toSet,
+      executeOperators = executeOperators.toSet,
+      freshRequiredOutputPorts = freshRequiredOutputPorts,
+      cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts,
+      freshRequiredLinks = freshRequiredLinks,
+      cacheFedLinks = cacheFedLinks
+    )
   }
 
-  private def buildOutputPortConfigs(
-      needsByRegion: Map[Region, Set[GlobalPortIdentity]],
-      cachedRegions: Set[Region]
-  ): Map[GlobalPortIdentity, OutputPortConfig] = {
-    // For cached regions, reuse cached URI + tuple count; otherwise allocate 
new URI with no cached count.
-    needsByRegion.flatMap {
-      case (region, ports) =>
-        ports.map { gpid =>
-          val (uri, cachedCount) =
-            if (cachedRegions.contains(region)) {
-              val cached = cachedOutputsByPort(gpid)
-              (cached.resultUri, cached.tupleCount)
-            } else {
-              (
-                createResultURI(
-                  workflowId = workflowContext.workflowId,
-                  executionId = workflowContext.executionId,
-                  globalPortId = gpid
-                ),
-                None
-              )
-            }
-          gpid -> OutputPortConfig(uri, cachedCount)
-        }
-    }
+  /**
+    * Builds a residual plan containing only execute operators and 
fresh-required dependencies.
+    */
+  private def buildResidualPlan(
+      plan: PhysicalPlan,
+      executeOperators: Set[PhysicalOpIdentity],
+      freshRequiredLinks: Set[PhysicalLink]
+  ): PhysicalPlan = {
+    val linksToRemove = plan.links.diff(freshRequiredLinks)
+    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
+    linksPrunedPlan.getSubPlan(executeOperators)
   }
 
-  private def buildInputPortConfigs(
-      matEdges: Set[PhysicalLink],
-      outputConfigByPort: Map[GlobalPortIdentity, OutputPortConfig]
-  ): Map[GlobalPortIdentity, IntermediateInputPortConfig] = {
-    matEdges
-      .groupBy(e => GlobalPortIdentity(e.toOpId, e.toPortId, input = true))
-      .map {
-        case (inputPid, links) =>
-          val uris = links
-            .map(link =>
-              outputConfigByPort(GlobalPortIdentity(link.fromOpId, 
link.fromPortId)).storageURI
-            )
-            .toList
-          inputPid -> IntermediateInputPortConfig(uris)
+  /**
+    * Builds the search-time binding context for residual Pasta planning.
+    * All bindings are prepared before search and consumed directly by 
createRegions.
+    */
+  private def buildResidualPlanningBindings(
+      residualPlan: PhysicalPlan,
+      analysis: RequirednessAnalysis
+  ): PlanningBindings = {
+    val residualOps = residualPlan.operators.map(_.id).toSet
+
+    val visibleFreshOutputPorts = analysis.requiredSeedPorts
+      .intersect(analysis.freshRequiredOutputPorts)
+      .filter(pid => residualOps.contains(pid.opId))
+
+    val reuseOnlyOutputsByPort = analysis.cacheHitRequiredOutputPorts
+      .filter(pid => residualOps.contains(pid.opId))
+      .flatMap(pid => cachedOutputsByPort.get(pid).map(cached => pid -> 
cached))
+      .toMap
+
+    val cacheFedInputUrisByPort = analysis.cacheFedLinks
+      .filter(link => residualOps.contains(link.toOpId))
+      .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) {
+        case (acc, link) =>
+          val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input 
= true)
+          val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId)
+          cachedOutputsByPort.get(outputPort) match {
+            case Some(cached) =>
+              val uris = acc.getOrElse(inputPort, List.empty) :+ 
cached.resultUri
+              acc.updated(inputPort, uris)
+            case None =>
+              acc
+          }
       }
+
+    PlanningBindings(
+      visibleFreshOutputPorts = visibleFreshOutputPorts,
+      reuseOnlyOutputsByPort = reuseOnlyOutputsByPort,
+      cacheFedInputUrisByPort = cacheFedInputUrisByPort
+    )
   }
 
-  private val costEstimatorMemoization
-      : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] =
-    new mutable.HashMap()
+  /**
+    * Runs the original Pasta search on a provided plan with precomputed 
planning bindings.
+    */
+  private def searchOnPlan(
+      plan: PhysicalPlan,
+      planningBindings: PlanningBindings
+  ): SearchResult = {
+    val oldPlan = activePlanningPlan
+    val oldBindings = activePlanningBindings
+    try {
+      activePlanningPlan = plan
+      activePlanningBindings = planningBindings
+      costEstimatorMemoization.clear()
+      runSearchWithTimeout()
+    } finally {
+      activePlanningPlan = oldPlan
+      activePlanningBindings = oldBindings
+    }
+  }
 
-  def generate(): (Schedule, PhysicalPlan) = {
-    val startTime = System.nanoTime()
-    val regionDAG = createRegionDAG()
-    val totalRPGTime = System.nanoTime() - startTime
-    val regionPlan = RegionPlan(
-      regions = regionDAG.iterator().asScala.toSet,
-      regionLinks = regionDAG.edgeSet().asScala.toSet
-    )
-    val schedule = generateScheduleFromRegionPlan(regionPlan)
-    logger.info(
-      s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, total RPG time: " +
-        s"${totalRPGTime / 1e6} ms."
-    )
-    (
-      schedule,
-      physicalPlan
+  /**
+    * Builds skip regions over operators excluded from residual Pasta planning.
+    */
+  private def buildSkipRegions(
+      plan: PhysicalPlan,
+      analysis: RequirednessAnalysis
+  ): Set[Region] = {
+    val skipOperators = 
plan.operators.map(_.id).diff(analysis.executeOperators)
+    if (skipOperators.isEmpty) {
+      return Set.empty
+    }
+    val skipInternalLinks = plan.links
+      .filter(link => skipOperators.contains(link.fromOpId) && 
skipOperators.contains(link.toOpId))
+    val linksToRemove = plan.links.diff(skipInternalLinks)
+    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
+    val skipPlan = linksPrunedPlan.getSubPlan(skipOperators)
+    val skipRegionSkeletons = createRegions(skipPlan, Set.empty)
+    skipRegionSkeletons.map { region =>
+      val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts
+        .filter(pid => region.physicalOps.exists(_.id == pid.opId))
+        .flatMap { outputPort =>
+          cachedOutputsByPort.get(outputPort).map { cached =>
+            outputPort -> OutputPortConfig(
+              storageURI = cached.resultUri,
+              cachedTupleCount = cached.tupleCount,
+              materialize = false
+            )
+          }
+        }
+        .toMap
+      region.copy(
+        cached = true,
+        resourceConfig = Some(ResourceConfig(portConfigs = 
reuseOnlyOutputConfigs))
+      )
+    }
+  }
+
+  /**
+    * Produces the final full region plan by combining skip regions and 
execute regions
+    * (from residual Pasta), then reindexing region IDs with skipped regions 
first.
+    */
+  private def buildFinalRegionPlan(
+      executeRegionDAG: DirectedAcyclicGraph[Region, RegionLink],
+      skipRegions: Set[Region]
+  ): RegionPlan = {
+    val executeRegions = executeRegionDAG.vertexSet().asScala.toSet
+    val executeLinks = executeRegionDAG.edgeSet().asScala.toSet
+    val allRegions = executeRegions ++ skipRegions
+    val orderedRegions = allRegions.toSeq.sortBy(region =>
+      (if (skipRegions.contains(region)) 0 else 1, region.id.id)
     )
+    val remappedRegionByRegion = orderedRegions.zipWithIndex.map {
+      case (region, idx) => region -> region.copy(id = 
RegionIdentity(idx.toLong))
+    }.toMap
+    val executeRegionById = executeRegions.map(region => region.id -> 
region).toMap
+
+    val remappedRegions = remappedRegionByRegion.values.toSet
+    val remappedExecuteLinks = executeLinks.map { link =>
+      val fromRegion = executeRegionById(link.fromRegionId)
+      val toRegion = executeRegionById(link.toRegionId)
+      RegionLink(
+        remappedRegionByRegion(fromRegion).id,
+        remappedRegionByRegion(toRegion).id
+      )
+    }
+    RegionPlan(remappedRegions, remappedExecuteLinks)
   }
 
   /**
-    * Partitions a physical plan into Regions (skeletons). Cache vs 
must-execute
-    * classification and URI assignment are applied later in 
annotateRegionsWithCacheInfo
-    * using the RegionDAG and materialization edges.
+    * Partitions a physical plan into Regions and assigns search-time port 
bindings in two passes.
     *
-    * @param physicalPlan the original physical plan (without materializations)
-    * @param matEdges     edges to be materialized (including blocking edges)
-    * @return a set of `Region` skeletons (resourceConfig/cached filled later)
+    * Pass 1 assigns output bindings for:
+    * - outputs of materialized edges in the current search state,
+    * - visible required outputs that must be freshly produced,
+    * - reuse-only required outputs that must bind to cached URIs.
+    *
+    * Pass 2 builds input bindings by wiring:
+    * - materialized-edge reader URIs from pass 1,
+    * - cache-fed input URIs precomputed during requiredness analysis.
     */
   private def createRegions(
       physicalPlan: PhysicalPlan,
       matEdges: Set[PhysicalLink]
   ): Set[Region] = {
-
-    // remove materialized edges and create connected components
-
     val matEdgesRemovedDAG: PhysicalPlan = 
matEdges.foldLeft(physicalPlan)(_.removeLink(_))
 
     val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] =
@@ -258,12 +396,8 @@ class CostBasedScheduleGenerator(
         matEdgesRemovedDAG.dag
       ).getConnectedComponents.asScala.toSet
 
-    //  build region skeletons with no materialization information
-
-    val regionSkeletons: Set[Region] = connectedComponents.zipWithIndex.map {
+    val regionsWithOutputBindings: Set[Region] = 
connectedComponents.zipWithIndex.map {
       case (connectedSubDAG, idx) =>
-        // Operators and intra‑region pipelined links
-
         val operators: Set[PhysicalOpIdentity] = 
connectedSubDAG.vertexSet().asScala.toSet
 
         val links: Set[PhysicalLink] = operators
@@ -276,7 +410,6 @@ class CostBasedScheduleGenerator(
 
         val physicalOps: Set[PhysicalOp] = 
operators.map(physicalPlan.getOperator)
 
-        // Enumerate all ports belonging to the Region
         val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op =>
           op.inputPorts.keys
             .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = 
true))
@@ -285,19 +418,103 @@ class CostBasedScheduleGenerator(
             .toSet
         }
 
-        // Build the Region skeleton; cache/resourceConfig to be populated 
after classification.
+        val outputPortIdsFromMatEdges = matEdges
+          .filter(edge => operators.contains(edge.fromOpId))
+          .map(edge => GlobalPortIdentity(edge.fromOpId, edge.fromPortId))
+        val visibleFreshOutputPortIds = 
activePlanningBindings.visibleFreshOutputPorts
+          .filter(portId => operators.contains(portId.opId))
+        val reuseOnlyOutputPortIds = 
activePlanningBindings.reuseOnlyOutputsByPort.keySet
+          .filter(portId => operators.contains(portId.opId))
+        val outputPortIdsNeedingBindings =
+          outputPortIdsFromMatEdges ++ visibleFreshOutputPortIds ++ 
reuseOnlyOutputPortIds
+
+        val outputPortConfigs = outputPortIdsNeedingBindings.map { 
outputPortId =>
+          activePlanningBindings.reuseOnlyOutputsByPort.get(outputPortId) 
match {
+            case Some(cachedOutput) =>
+              outputPortId -> OutputPortConfig(
+                storageURI = cachedOutput.resultUri,
+                cachedTupleCount = cachedOutput.tupleCount,
+                materialize = false
+              )
+            case None =>
+              outputPortId -> OutputPortConfig(
+                storageURI = createResultURI(
+                  workflowId = workflowContext.workflowId,
+                  executionId = workflowContext.executionId,
+                  globalPortId = outputPortId
+                ),
+                cachedTupleCount = None,
+                materialize = true
+              )
+          }
+        }.toMap
+
+        val resourceConfig =
+          if (outputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
outputPortConfigs))
+          else None
+
         Region(
           id = RegionIdentity(idx),
           physicalOps = physicalOps,
           physicalLinks = links,
           ports = ports,
-          resourceConfig = None,
+          resourceConfig = resourceConfig,
           cached = false
         )
     }
 
-    // Regions are returned as skeletons; cache/URI assignment happens in 
annotateRegionsWithCacheInfo.
-    regionSkeletons
+    val regionByOperator = regionsWithOutputBindings
+      .flatMap(region => region.getOperators.map(op => op.id -> region))
+      .toMap
+
+    regionsWithOutputBindings.map { region =>
+      val inputUrisFromMatEdges = matEdges
+        .filter(edge => region.physicalOps.exists(_.id == edge.toOpId))
+        .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) {
+          case (acc, matEdge) =>
+            val globalInputPortId =
+              GlobalPortIdentity(matEdge.toOpId, matEdge.toPortId, input = 
true)
+            val globalOutputPortId = GlobalPortIdentity(matEdge.fromOpId, 
matEdge.fromPortId)
+            val inputReaderUri = regionByOperator(matEdge.fromOpId)
+              .resourceConfig
+              .get
+              .portConfigs(globalOutputPortId)
+              .storageURIs
+              .head
+            acc.updated(
+              globalInputPortId,
+              acc.getOrElse(globalInputPortId, List.empty[URI]) :+ 
inputReaderUri
+            )
+        }
+
+      val cacheFedInputUris = activePlanningBindings.cacheFedInputUrisByPort
+        .filter { case (inputPortId, _) => region.ports.contains(inputPortId) }
+        .toMap
+
+      val mergedInputUris = (inputUrisFromMatEdges.keySet ++ 
cacheFedInputUris.keySet)
+        .map { inputPortId =>
+          val uris =
+            inputUrisFromMatEdges.getOrElse(inputPortId, List.empty) ++
+              cacheFedInputUris.getOrElse(inputPortId, List.empty)
+          inputPortId -> uris
+        }
+        .toMap
+
+      val inputPortConfigs = mergedInputUris.map {
+        case (inputPortId, uris) =>
+          inputPortId -> IntermediateInputPortConfig(uris)
+      }
+
+      val existingPortConfigs = 
region.resourceConfig.map(_.portConfigs).getOrElse(Map.empty)
+      val newResourceConfig =
+        if (inputPortConfigs.nonEmpty || existingPortConfigs.nonEmpty) {
+          Some(ResourceConfig(portConfigs = existingPortConfigs ++ 
inputPortConfigs))
+        } else {
+          None
+        }
+
+      region.copy(resourceConfig = newResourceConfig)
+    }
   }
 
   /**
@@ -313,7 +530,7 @@ class CostBasedScheduleGenerator(
     val regionDAG = new DirectedAcyclicGraph[Region, 
RegionLink](classOf[RegionLink])
     val regionGraph = new DirectedPseudograph[Region, 
RegionLink](classOf[RegionLink])
     val opToRegionMap = new mutable.HashMap[PhysicalOpIdentity, Region]
-    createRegions(physicalPlan, matEdges).foreach(region => {
+    createRegions(activePlanningPlan, matEdges).foreach(region => {
       region.getOperators.foreach(op => opToRegionMap(op.id) = region)
       regionGraph.addVertex(region)
       regionDAG.addVertex(region)
@@ -330,19 +547,13 @@ class CostBasedScheduleGenerator(
           isAcyclic = false
       }
     })
-    if (isAcyclic) {
-      annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap)
-      Left(regionDAG)
-    } else Right(regionGraph)
+    if (isAcyclic) Left(regionDAG) else Right(regionGraph)
   }
 
   /**
-    * Performs a search to generate a region DAG.
-    * Materializations are added only after the plan is determined to be 
schedulable.
-    *
-    * @return A region DAG.
+    * Runs Pasta search with timeout handling and returns the selected search 
result.
     */
-  private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
+  private def runSearchWithTimeout(): SearchResult = {
     val searchResultFuture: Future[SearchResult] = Future {
       workflowContext.workflowSettings.executionMode match {
         case ExecutionMode.MATERIALIZED =>
@@ -376,9 +587,7 @@ class CostBasedScheduleGenerator(
       s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, search time: " +
         s"${searchResult.searchTimeNanoSeconds / 1e6} ms."
     )
-
-    val regionDAG = searchResult.regionDAG
-    regionDAG
+    searchResult
   }
 
   /**
@@ -400,10 +609,10 @@ class CostBasedScheduleGenerator(
     val startTime = System.nanoTime()
     val originalNonBlockingEdges =
       if (oCleanEdges) {
-        physicalPlan.getNonBridgeNonBlockingLinks
+        activePlanningPlan.getNonBridgeNonBlockingLinks
       } else {
-        physicalPlan.links.diff(
-          physicalPlan.getBlockingAndDependeeLinks
+        activePlanningPlan.links.diff(
+          activePlanningPlan.getBlockingAndDependeeLinks
         )
       }
     // Queue to hold states to be explored, starting with the empty set
@@ -434,7 +643,7 @@ class CostBasedScheduleGenerator(
         }
         visited.add(currentState)
         tryConnectRegionDAG(
-          physicalPlan.getBlockingAndDependeeLinks ++ currentState
+          activePlanningPlan.getBlockingAndDependeeLinks ++ currentState
         ) match {
           case Left(regionDAG) =>
             updateOptimumIfApplicable(regionDAG)
@@ -465,12 +674,12 @@ class CostBasedScheduleGenerator(
         */
       def addNeighborStatesToFrontier(): Unit = {
         val allCurrentMaterializedEdges =
-          currentState ++ physicalPlan.getBlockingAndDependeeLinks
+          currentState ++ activePlanningPlan.getBlockingAndDependeeLinks
         // Generate and enqueue all neighbour states that haven't been visited
         var candidateEdges = originalNonBlockingEdges
           .diff(currentState)
         if (oChains) {
-          val edgesInChainWithMaterializedEdges = physicalPlan.maxChains
+          val edgesInChainWithMaterializedEdges = activePlanningPlan.maxChains
             .filter(chain => 
chain.intersect(allCurrentMaterializedEdges).nonEmpty)
             .flatten
           candidateEdges = candidateEdges.diff(
@@ -501,7 +710,7 @@ class CostBasedScheduleGenerator(
           if (filteredNeighborStates.nonEmpty) {
             val minCostNeighborState = 
filteredNeighborStates.minBy(neighborState =>
               tryConnectRegionDAG(
-                physicalPlan.getBlockingAndDependeeLinks ++ neighborState
+                activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
                   allocateResourcesAndEvaluateCost(regionDAG)
@@ -527,7 +736,7 @@ class CostBasedScheduleGenerator(
     val startTime = System.nanoTime()
 
     val (regionDAG, cost) =
-      tryConnectRegionDAG(physicalPlan.links) match {
+      tryConnectRegionDAG(activePlanningPlan.links) match {
         case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag))
         case Right(_) =>
           (
@@ -561,14 +770,14 @@ class CostBasedScheduleGenerator(
   ): SearchResult = {
     val startTime = System.nanoTime()
     // Starting from a state where all non-blocking edges are materialized
-    val originalSeedState = physicalPlan.links.diff(
-      physicalPlan.getBlockingAndDependeeLinks
+    val originalSeedState = activePlanningPlan.links.diff(
+      activePlanningPlan.getBlockingAndDependeeLinks
     )
 
     // Chain optimization: an edge in the same chain as a blocking edge should 
not be materialized
     val seedStateOptimizedByChainsIfApplicable = if (oChains) {
-      val edgesInChainWithBlockingEdge = physicalPlan.maxChains
-        .filter(chain => 
chain.intersect(physicalPlan.getBlockingAndDependeeLinks).nonEmpty)
+      val edgesInChainWithBlockingEdge = activePlanningPlan.maxChains
+        .filter(chain => 
chain.intersect(activePlanningPlan.getBlockingAndDependeeLinks).nonEmpty)
         .flatten
       originalSeedState.diff(edgesInChainWithBlockingEdge)
     } else {
@@ -577,7 +786,7 @@ class CostBasedScheduleGenerator(
 
     // Clean edge optimization: a clean edge should not be materialized
     val finalSeedState = if (oCleanEdges) {
-      
seedStateOptimizedByChainsIfApplicable.intersect(physicalPlan.getNonBridgeNonBlockingLinks)
+      
seedStateOptimizedByChainsIfApplicable.intersect(activePlanningPlan.getNonBridgeNonBlockingLinks)
     } else {
       seedStateOptimizedByChainsIfApplicable
     }
@@ -597,7 +806,7 @@ class CostBasedScheduleGenerator(
       val currentState = queue.dequeue()
       visited.add(currentState)
       tryConnectRegionDAG(
-        physicalPlan.getBlockingAndDependeeLinks ++ currentState
+        activePlanningPlan.getBlockingAndDependeeLinks ++ currentState
       ) match {
         case Left(regionDAG) =>
           updateOptimumIfApplicable(regionDAG)
@@ -640,7 +849,7 @@ class CostBasedScheduleGenerator(
           if (unvisitedNeighborStates.nonEmpty) {
             val minCostNeighborState = 
unvisitedNeighborStates.minBy(neighborState =>
               tryConnectRegionDAG(
-                physicalPlan.getBlockingAndDependeeLinks ++ neighborState
+                activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
                   allocateResourcesAndEvaluateCost(regionDAG)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e292cddf0b..fe34536c47 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -50,6 +50,7 @@ import 
org.apache.texera.amber.engine.architecture.scheduling.config.{
   InputPortConfig,
   OperatorConfig,
   OutputPortConfig,
+  PortConfig,
   ResourceConfig
 }
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning
@@ -140,8 +141,9 @@ class RegionExecutionCoordinator(
       val outputMetrics = resourceConfig.portConfigs
         .collect {
           case (gpid, cfg: OutputPortConfig) if gpid.opId == op.id =>
-            // Only emit metrics for materialized outputs; UI treats missing 
ports as skipped.
-            val count = cfg.cachedTupleCount.getOrElse(0L)
+            // Emit metrics only for configured output ports in this cached 
region.
+            // Use -1 to preserve unknown cached counts in UI/stats instead of 
reporting 0.
+            val count = cfg.cachedTupleCount.getOrElse(-1L)
             PortTupleMetricsMapping(gpid.portId, TupleMetrics(count, 0L))
         }
         .toSeq
@@ -317,10 +319,12 @@ class RegionExecutionCoordinator(
 
   private def executeNonDependeePortPhase(): Future[Unit] = {
     setPhase(ExecutingNonDependeePortsPhase)
-    // Allocate output port storage objects
+    // Register reuse-only output bindings (cache-hit ports) without creating 
new storage objects.
+    registerReuseOnlyOutputPortResults(region.resourceConfig.get.portConfigs)
+    // Allocate output port storage objects for fresh materializations only.
     region.resourceConfig.get.portConfigs
       .collect {
-        case (id, cfg: OutputPortConfig) => id -> cfg
+        case (id, cfg: OutputPortConfig) if cfg.materialize => id -> cfg
       }
       .foreach {
         case (pid, cfg) =>
@@ -502,7 +506,7 @@ class RegionExecutionCoordinator(
                             if gid == GlobalPortIdentity(
                               opId = physicalOp.id,
                               portId = outputPortId
-                            ) =>
+                            ) && cfg.materialize =>
                           cfg.storageURI.toString
                       }
                       .getOrElse("")
@@ -538,6 +542,25 @@ class RegionExecutionCoordinator(
     )
   }
 
+  /**
+    * Persists result URI bindings for reuse-only output ports so consumers 
and UI can
+    * resolve cached results in the current execution without 
rematerialization.
+    */
+  private def registerReuseOnlyOutputPortResults(
+      portConfigs: Map[GlobalPortIdentity, PortConfig]
+  ): Unit = {
+    portConfigs.foreach {
+      case (outputPortId, outputCfg: OutputPortConfig)
+          if !outputPortId.input && !outputCfg.materialize =>
+        WorkflowExecutionsResource.insertOperatorPortResultUri(
+          eid = executionId,
+          globalPortId = outputPortId,
+          uri = outputCfg.storageURI
+        )
+      case _ =>
+    }
+  }
+
   private def connectChannels(links: Set[PhysicalLink]): 
Future[Seq[EmptyReturn]] = {
     if (region.cached) {
       return Future.value(Seq.empty)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
index 8d6c616f79..cac339927d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
@@ -31,8 +31,18 @@ sealed trait PortConfig {
   def storageURIs: List[URI]
 }
 
-/** An output port requires exactly one materialization URI. */
-final case class OutputPortConfig(storageURI: URI, cachedTupleCount: 
Option[Long] = None)
+/**
+  * Output port configuration for scheduling/runtime.
+  *
+  * @param storageURI URI bound at planning time for this output port.
+  * @param cachedTupleCount Optional cached tuple count for UI/metrics when 
serving from cache.
+  * @param materialize When false, this output is reuse-only (cache-hit) and 
must not be freshly materialized.
+  */
+final case class OutputPortConfig(
+    storageURI: URI,
+    cachedTupleCount: Option[Long] = None,
+    materialize: Boolean = true
+)
     extends PortConfig {
   override val storageURIs: List[URI] = List(storageURI)
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala 
b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
index a2e2591802..7476a99d3c 100644
--- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
+++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala
@@ -156,7 +156,7 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
 
   /**
     * Insert or update a cache entry (upsert).
-    * On conflict (workflow_id, global_port_id, subdag_hash), updates the 
existing record.
+    * On conflict (workflow_id, global_port_id, subdag_hash), updates the 
existing record and refreshes updated_at.
     *
     * @param record OperatorPortCacheRecord to insert/update
     */
@@ -183,6 +183,7 @@ class OperatorPortCacheDao(sqlServer: SqlServer) {
       .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson)
       .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount)
       .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, 
dbRecord.getSourceExecutionId)
+      .set(OPERATOR_PORT_CACHE.UPDATED_AT, DSL.currentOffsetDateTime())
       .execute()
   }
 
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index a98e9801c7..ddf99413d6 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -2,18 +2,18 @@
 
 ## Objective
 
-Enable **incremental workflow execution** through cost-aware caching of 
operator output ports. When users iteratively refine workflows (modify 
parameters, add/remove operators), the system should:
+Enable **incremental workflow execution** through deterministic cache reuse of 
operator output ports. Under MVP Assumption III, when users iteratively refine 
workflows (modify parameters, add/remove operators), the system should:
 
 1. **Reuse cached results** where upstream computation is unchanged
-2. **Make cost-aware decisions** comparing cache read cost vs recomputation 
cost
+2. **Never recompute a required output port with a cache hit** (forced cache 
use)
 3. **Maintain correctness** via deterministic fingerprinting of upstream 
sub-DAGs
-4. **Preserve physical plan immutability** — caching is scheduler metadata, 
not plan modification
+4. **Preserve physical plan immutability**: skip/execute/cache/fresh are 
scheduler annotations, not plan mutations
 
-**Key principle**: The physical plan remains unchanged; cache-or-recompute 
decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based 
on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream 
sub-DAG.
+**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a 
two-step process in `CostBasedScheduleGenerator`: (1) deterministic 
requiredness propagation with forced cache use, then (2) original Pasta 
optimization on the residual fresh-required structure only.
 
-**Research goals**:
+**Future research goals**:
 
-- Extend Pasta scheduler with cost-based caching decisions
+- Extend Pasta scheduler with cost-based cache-vs-recompute decisions
 - Develop cost models and pruning heuristics for what/when to cache
 - Evaluate speedup on iterative data science workflows
 - (Optional) Lifecycle management: eviction, invalidation, garbage collection
@@ -22,7 +22,7 @@ Enable **incremental workflow execution** through cost-aware 
caching of operator
 
 ### 1. Physical Plan Immutability
 
-The physical plan is never modified to accommodate caching. Cache decisions 
are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This 
preserves the integrity of the Pasta scheduling framework.
+The physical plan is never modified to accommodate caching. Cache decisions 
are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs` and 
required output ports.
 
 ### 2. Fingerprint-Based Correctness
 
@@ -38,21 +38,31 @@ A region is either fully cached (ToSkip) or fully executed 
(ToExecute) — no pa
 
 - Scheduler logic (binary cached flag per region)
 - Runtime state management (consistent execution mode)
-- Cost model (compare full region costs)
+- MVP planning (skip regions are excluded from Pasta planning scope)
 
-### 4. Shallow State Hierarchy for Cached Regions
+### 4. Forced Cache Use (Assumption III)
+
+For any required output port:
+
+- If cache exists, the requirement is satisfied from cache.
+- The port is not recomputed for satisfying that requirement.
+- There is no per-port or per-region cache-vs-recompute cost decision in MVP.
+
+This includes mixed-output operators: if one required output port is cache 
miss and another required output port is cache hit, the operator may execute, 
but consumers of the cache-hit output must still bind to the cached URI.
+
+### 5. Shallow State Hierarchy for Cached Regions
 
 ToSkip regions create lightweight state structures (Workflow → Region → 
Operator/Link) and store cached metrics at the operator level. No 
Worker/Channel states are created, so `numWorkers=0` and no worker assignments 
are emitted.
 
-### 5. Stats Emission via Direct Client Updates
+### 6. Stats Emission via Direct Client Updates
 
 Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via 
`asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). 
This reuses existing stats infrastructure without special-casing the frontend.
 
-### 6. Explicit Cache State in Metrics
+### 7. Explicit Cache State in Metrics
 
 Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in 
`WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to 
users and distinguishes cache-hit completion from normal execution completion.
 
-### 7. Deferred Lifecycle Management
+### 8. Deferred Lifecycle Management
 
 V1 assumes unlimited storage. Eviction, TTL, and garbage collection are 
research topics for future work, not implementation blockers.
 
@@ -99,26 +109,33 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 
 - Physical plan (immutable)
 - `cachedOutputs` (Δ): map of cache hits from step 1
-- Visible ports (☐): ports that must be materialized for user visibility
-
-**Region Classification**:
-
-- **Homogeneity constraint**: A region is either fully cached (ToSkip) or 
fully executed (ToExecute) — no mixing within a region
-- **ToExecute regions**: Contain visible ports without cache hits OR depend on 
uncached intermediate materializations
-- **ToSkip regions**: All required output ports have cache hits AND no visible 
ports need fresh computation
-
-**Cost Model** (currently simple, needs refinement for research):
-
-- Cached regions: cost = 0
-- Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + 
materialization read/write costs
-- Cache read/write: small fixed costs (0.5 per port)
-- **Future**: Historical stats-based estimation from `runtime_statistics` table
+- Required output ports (first-class): sink output ports + visible 
intermediate output ports (`outputPortsNeedingStorage`)
+
+**MVP Scheduling Flow (Assumption III)**:
+
+1. **Deterministic requiredness propagation** (port/edge aware):
+   - Seed required outports with required sink + visible ports.
+   - For each operator, if any required outport is cache miss, mark operator 
`Execute`; otherwise `Skip`.
+   - For each `Execute` operator, mark all upstream outports on incoming edges 
as required.
+   - Iterate to fixed point.
+2. **Classify inputs of executing operators**:
+   - `Cache-fed` if upstream required outport has cache hit.
+   - `Fresh-required` if upstream required outport has cache miss.
+3. **Residual Pasta optimization**:
+   - Build residual planning structure containing only `Execute` operators and 
`Fresh-required` dependencies.
+   - Run original Pasta search/regioning/materialization on this residual 
structure.
+   - Keep blocking-edge constraints within the residual scope.
+4. **Assemble final full schedule**:
+   - Include residual execute regions (`ToExecute`) and skipped regions 
(`ToSkip`) in one schedule.
+   - Regions remain homogeneous (`cached=true/false`).
+   - All URI bindings are fixed at planning time.
+   - Cache-hit required outputs are marked as reuse-only and are never 
re-materialized during execution.
 
 **Output**:
 
-- Schedule with regions marked `cached=true/false`
-- Port configs updated with cached URIs for ToSkip regions
-- Cached tuple counts reused in port metadata
+- Single schedule over the full workflow with both `ToSkip` and `ToExecute` 
regions
+- Planning-time URI bindings for cache-fed inputs and fresh-required outputs
+- Reuse-only output semantics for cache-hit required ports to suppress 
rematerialization
 
 ### 3. Runtime Execution
 
@@ -142,7 +159,7 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
    - Input/output tuple counts from cached metadata
    - **UI**: The graph view displays `-` for cached input counts and for 
cached output ports that were not materialized; worker counts show `from cache`.
    - **Note**: Cached stats are synthetic (inputs default to 0; 
non-materialized outputs may be omitted). Do not use them for cost modeling 
until we add explicit tagging/filtering in `runtime_statistics`.
-5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` 
for materialized inputs
+5. **Planning-time URI bindings**: Downstream operators consume URIs bound 
during scheduling; no runtime cache-vs-fresh URI decisions
 6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment 
events (consistent with numWorkers=0)
 7. **Set phase to Completed**: Region lifecycle completes immediately
 
@@ -169,6 +186,11 @@ Entry point: `RegionExecutionCoordinator` constructor 
branches on `region.cached
 
 **Architecture note**: Event-based communication follows existing controller 
pattern - handler emits events via `sendToClient()`, service layer registers 
callbacks to handle them. Clean separation: engine layer knows nothing about 
web/service layer.
 
+**Forced-cache nuance**:
+
+- If an operator executes due to one required output miss, other required 
outputs on the same operator that are cache hits are reuse-only.
+- Those cache-hit outputs are not freshly materialized in this run, and 
consumers remain bound to cached URIs.
+
 ### 4. Client-Side State Management
 
 **Location**: `ExecutionStatsService`, `ExecutionStateStore`
@@ -325,7 +347,7 @@ Phase 1.1 Service/DAO architecture is complete. Key 
components:
 
 ### 7. Testing Strategy
 
-- **Unit tests**: Fingerprint determinism, cost model logic, region 
classification
+- **Unit tests**: Fingerprint determinism, forced-cache propagation logic, 
region classification
 - **Integration tests**: Cache upsert → DB verification, cache lookup → region 
marking
 - **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip 
behavior and result correctness
 - **Change detection**: Modify operator params → verify fingerprint mismatch → 
cache miss
@@ -368,11 +390,20 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs 
for deterministic subDAG hashing
 - **Submission-time lookup**: `WorkflowExecutionService` uses 
`OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for 
all physical output ports, queries cache, stores hits in 
`WorkflowSettings.cachedOutputs`
 - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event 
→ `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → 
`OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count)
-- **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached 
when all required outputs have hits, reuses cached URIs in port configs
+- **Scheduler integration**:
+  - Implemented behavior (current branch): deterministic requiredness 
propagation + residual Pasta + full schedule assembly with skip regions 
excluded from Pasta planning scope
+  - Planning bindings are prepared before Pasta and consumed during 
`createRegions` in search:
+    - fresh-required outputs receive fresh URIs,
+    - cache-hit required outputs are bound as reuse-only (`materialize=false`),
+    - cache-fed inputs are pre-bound to cached URIs.
+  - No post-search region re-annotation step; resource allocation remains in 
Pasta search (`allocateResourcesAndEvaluateCost`) and final execute-region 
input configs stay as `InputPortConfig`.
 - **Runtime execution**: `RegionExecutionCoordinator` branches on 
`region.cached` flag:
   - ToSkip regions: `completeCachedRegion()` records cached operator metrics 
(numWorkers=0, processingTime=0) without creating workers, propagates cached 
URIs downstream
   - ToExecute regions: normal execution path
 - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct 
client updates, maintaining consistency with normal execution lifecycle
+- **Controller stats query robustness**:
+  - `QueryWorkerStatisticsHandler` now uses optional operator-execution lookup 
and skips operators not yet initialized.
+  - This avoids `None.get` during global stats polling when schedule levels 
are launched incrementally (including skip-first ordering where some execute 
regions are still pending initialization).
 - **Frontend cache visualization** (Phase 1.2 - Complete):
   - Added `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf 
enum
   - Added `CompletedFromCache` phase to `RegionExecutionCoordinator` for 
cached region lifecycle
@@ -400,14 +431,14 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
 The cache system integrates with three layers:
 
 1. **Execution Planning Layer**: Cache lookup at workflow submission, 
fingerprint computation
-2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, 
region classification
-3. **Runtime Layer**: Short-circuit execution for cached regions, state 
management, stats emission
+2. **Scheduler Layer (Pasta)**: Deterministic forced-cache propagation + 
residual Pasta optimization + full schedule assembly
+3. **Runtime Layer**: Short-circuit execution for cached regions, 
planning-time URI binding consumption, stats emission
 
 ## Research Contributions
 
-### 1. Cost Model & Pruning (Primary Contribution)
+### 1. Cost Model & Pruning (Future Contribution)
 
-**Goal**: Determine when caching is beneficial and what outputs to cache.
+**Goal**: Determine when caching is beneficial and what outputs to cache, 
beyond MVP Assumption III.
 
 **Components**:
 
@@ -417,9 +448,9 @@ The cache system integrates with three layers:
   - Skip caching small outputs (recompute is cheap)
   - Skip terminal operators (no downstream reuse)
   - Skip low-reuse-probability operators
-- **Cache-or-recompute decision**: Per-region cost comparison (cache read vs 
recompute)
+- **Cache-or-recompute decision**: Per-region or per-port comparison (cache 
read vs recompute)
 
-**Current status**: Simple cost model (cached=0, execute>0). Need to develop 
historical stats-based estimation.
+**Current MVP status**: Disabled for merge target. MVP uses forced cache use 
when required output ports have cache hits.
 
 **Data source**: `runtime_statistics` table already captures execution metrics 
(data/control processing time, tuple counts, worker counts per operator).
 

Reply via email to