This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 0c0f2a7734bacd52a7a96bdf19ace75a66571ee5
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Feb 9 17:39:00 2026 -0800

    feat(cache): seperate cache reuse logic outside of Pasta.
---
 .../controller/CacheReusePreSchedulingStep.scala   | 315 +++++++++++
 .../controller/WorkflowScheduler.scala             |  28 +-
 .../scheduling/CostBasedScheduleGenerator.scala    | 585 +++++++--------------
 .../architecture/scheduling/CostEstimator.scala    | 119 ++++-
 .../scheduling/PreSchedulingHints.scala            |  42 ++
 docs/operator-port-cache.md                        |  22 +-
 6 files changed, 692 insertions(+), 419 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
new file mode 100644
index 0000000000..122eadf2da
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller
+
+import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
+import org.apache.texera.amber.core.workflow.{
+  CachedOutput,
+  GlobalPortIdentity,
+  PhysicalLink,
+  PhysicalPlan,
+  WorkflowContext,
+  WorkflowSettings
+}
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.{OutputPortConfig,
 ResourceConfig}
+import org.apache.texera.amber.engine.architecture.scheduling.{
+  PreSchedulingHints,
+  Region,
+  RegionIdentity
+}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde
+import org.jgrapht.Graph
+import org.jgrapht.alg.connectivity.BiconnectivityInspector
+
+import java.net.URI
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+  * Pre-scheduling cache step for forced-cache-reuse MVP semantics.
+  *
+  * This step runs before CostBasedScheduleGenerator and only prepares 
planning inputs:
+  * residual planning plan, workflow settings override, precomputed URI/config 
hints, and leading
+  * skipped regions. The final schedule is still generated by 
CostBasedScheduleGenerator.
+  */
+object CacheReusePreSchedulingStep {
+
+  /**
+    * Output of the pre-scheduling phase consumed by 
CostBasedScheduleGenerator.
+    */
+  case class CacheReusePreSchedulingResult(
+      planningPhysicalPlan: PhysicalPlan,
+      planningWorkflowSettings: WorkflowSettings,
+      planningHints: PreSchedulingHints,
+      leadingRegions: Set[Region]
+  )
+
+  /**
+    * Deterministic requiredness result under forced-cache semantics.
+    */
+  private case class RequirednessAnalysis(
+      requiredSeedPorts: Set[GlobalPortIdentity],
+      executeOperators: Set[PhysicalOpIdentity],
+      freshRequiredOutputPorts: Set[GlobalPortIdentity],
+      cacheHitRequiredOutputPorts: Set[GlobalPortIdentity],
+      freshRequiredLinks: Set[PhysicalLink],
+      cacheFedLinks: Set[PhysicalLink]
+  )
+
+  /**
+    * Prepare cache-aware planning inputs for CostBasedScheduleGenerator.
+    */
+  def prepare(
+      workflowContext: WorkflowContext,
+      physicalPlan: PhysicalPlan
+  ): CacheReusePreSchedulingResult = {
+    val cachedOutputsByPort = 
workflowContext.workflowSettings.cachedOutputs.map {
+      case (serializedId, cachedOutput) =>
+        GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
+    }
+
+    if (cachedOutputsByPort.isEmpty) {
+      return CacheReusePreSchedulingResult(
+        planningPhysicalPlan = physicalPlan,
+        planningWorkflowSettings = workflowContext.workflowSettings,
+        planningHints = PreSchedulingHints(),
+        leadingRegions = Set.empty
+      )
+    }
+
+    val analysis = analyzeRequiredness(physicalPlan, workflowContext, 
cachedOutputsByPort)
+    val skipRegions = buildSkipRegions(physicalPlan, analysis, 
cachedOutputsByPort)
+    val residualPlan =
+      buildResidualPlan(physicalPlan, analysis.executeOperators, 
analysis.freshRequiredLinks)
+    val planningWorkflowSettings = workflowContext.workflowSettings.copy(
+      outputPortsNeedingStorage =
+        analysis.requiredSeedPorts.intersect(analysis.freshRequiredOutputPorts)
+    )
+    val planningHints = buildPlanningHints(residualPlan, analysis, 
cachedOutputsByPort)
+
+    CacheReusePreSchedulingResult(
+      planningPhysicalPlan = residualPlan,
+      planningWorkflowSettings = planningWorkflowSettings,
+      planningHints = planningHints,
+      leadingRegions = skipRegions
+    )
+  }
+
+  /**
+    * Backward requiredness propagation with forced cache reuse:
+    * - cache hit required outputs do not force execution,
+    * - cache miss required outputs force execution and upstream propagation.
+    */
+  private def analyzeRequiredness(
+      plan: PhysicalPlan,
+      workflowContext: WorkflowContext,
+      cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput]
+  ): RequirednessAnalysis = {
+    val requiredSeeds =
+      workflowContext.workflowSettings.outputPortsNeedingStorage
+        .filter(pid => plan.operators.exists(_.id == pid.opId))
+
+    val requiredOutputPorts = mutable.Set[GlobalPortIdentity]()
+    requiredOutputPorts ++= requiredSeeds
+    val executeOperators = mutable.Set[PhysicalOpIdentity]()
+
+    var changed = true
+    while (changed) {
+      changed = false
+      plan.topologicalIterator().foreach { opId =>
+        val op = plan.getOperator(opId)
+        val requiredOutputsOfOp =
+          op.outputPorts.keys
+            .map(portId => GlobalPortIdentity(op.id, portId))
+            .filter(requiredOutputPorts.contains)
+            .toSet
+        if (requiredOutputsOfOp.nonEmpty) {
+          val hasCacheMiss = requiredOutputsOfOp.exists(pid => 
!cachedOutputsByPort.contains(pid))
+          if (hasCacheMiss) {
+            if (!executeOperators.contains(opId)) {
+              executeOperators += opId
+              changed = true
+            }
+            plan.getUpstreamPhysicalLinks(opId).foreach { link =>
+              val upstreamOutput = GlobalPortIdentity(link.fromOpId, 
link.fromPortId)
+              if (!requiredOutputPorts.contains(upstreamOutput)) {
+                requiredOutputPorts += upstreamOutput
+                changed = true
+              }
+            }
+          }
+        }
+      }
+    }
+
+    val freshRequiredOutputPorts =
+      requiredOutputPorts.filter(pid => 
!cachedOutputsByPort.contains(pid)).toSet
+    val cacheHitRequiredOutputPorts =
+      requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet
+
+    val linksIntoExecute = plan.links.filter(link => 
executeOperators.contains(link.toOpId))
+    val freshRequiredLinks = linksIntoExecute
+      .filter(link =>
+        freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
+      )
+      .toSet
+    val cacheFedLinks = linksIntoExecute
+      .filter(link =>
+        cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
+      )
+      .toSet
+
+    RequirednessAnalysis(
+      requiredSeedPorts = requiredSeeds,
+      executeOperators = executeOperators.toSet,
+      freshRequiredOutputPorts = freshRequiredOutputPorts,
+      cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts,
+      freshRequiredLinks = freshRequiredLinks,
+      cacheFedLinks = cacheFedLinks
+    )
+  }
+
+  /**
+    * Keep only execute operators and fresh-required dependencies.
+    */
+  private def buildResidualPlan(
+      plan: PhysicalPlan,
+      executeOperators: Set[PhysicalOpIdentity],
+      freshRequiredLinks: Set[PhysicalLink]
+  ): PhysicalPlan = {
+    val linksToRemove = plan.links.diff(freshRequiredLinks)
+    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
+    linksPrunedPlan.getSubPlan(executeOperators)
+  }
+
+  /**
+    * Prepare generic planning hints for CostBasedScheduleGenerator.
+    */
+  private def buildPlanningHints(
+      residualPlan: PhysicalPlan,
+      analysis: RequirednessAnalysis,
+      cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput]
+  ): PreSchedulingHints = {
+    val residualOps = residualPlan.operators.map(_.id).toSet
+
+    val outputPortConfigOverrides = analysis.cacheHitRequiredOutputPorts
+      .filter(pid => residualOps.contains(pid.opId))
+      .flatMap { pid =>
+        cachedOutputsByPort.get(pid).map { cached =>
+          pid -> OutputPortConfig(
+            storageURI = cached.resultUri,
+            cachedTupleCount = cached.tupleCount,
+            materialize = false
+          )
+        }
+      }
+      .toMap
+
+    val inputPortUriOverrides = analysis.cacheFedLinks
+      .filter(link => residualOps.contains(link.toOpId))
+      .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) {
+        case (acc, link) =>
+          val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input 
= true)
+          val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId)
+          cachedOutputsByPort.get(outputPort) match {
+            case Some(cached) =>
+              val uris = acc.getOrElse(inputPort, List.empty) :+ 
cached.resultUri
+              acc.updated(inputPort, uris)
+            case None =>
+              acc
+          }
+      }
+
+    PreSchedulingHints(outputPortConfigOverrides, inputPortUriOverrides)
+  }
+
+  /**
+    * Build skipped regions (outside Pasta scope) with reuse-only output-port 
bindings.
+    */
+  private def buildSkipRegions(
+      plan: PhysicalPlan,
+      analysis: RequirednessAnalysis,
+      cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput]
+  ): Set[Region] = {
+    val skipOperators = 
plan.operators.map(_.id).diff(analysis.executeOperators)
+    if (skipOperators.isEmpty) {
+      return Set.empty
+    }
+
+    val skipInternalLinks = plan.links
+      .filter(link => skipOperators.contains(link.fromOpId) && 
skipOperators.contains(link.toOpId))
+    val linksToRemove = plan.links.diff(skipInternalLinks)
+    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
+    val skipPlan = linksPrunedPlan.getSubPlan(skipOperators)
+
+    createConnectedRegions(skipPlan).map { region =>
+      val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts
+        .filter(pid => region.physicalOps.exists(_.id == pid.opId))
+        .flatMap { outputPort =>
+          cachedOutputsByPort.get(outputPort).map { cached =>
+            outputPort -> OutputPortConfig(
+              storageURI = cached.resultUri,
+              cachedTupleCount = cached.tupleCount,
+              materialize = false
+            )
+          }
+        }
+        .toMap
+      region.copy(
+        cached = true,
+        resourceConfig = Some(ResourceConfig(portConfigs = 
reuseOnlyOutputConfigs))
+      )
+    }
+  }
+
+  /**
+    * Region skeleton partitioning without materialized-edge cuts.
+    */
+  private def createConnectedRegions(plan: PhysicalPlan): Set[Region] = {
+    val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] =
+      new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink](
+        plan.dag
+      ).getConnectedComponents.asScala.toSet
+
+    connectedComponents.zipWithIndex.map {
+      case (connectedSubDAG, idx) =>
+        val operators: Set[PhysicalOpIdentity] = 
connectedSubDAG.vertexSet().asScala.toSet
+        val links: Set[PhysicalLink] = operators
+          .flatMap(opId => plan.getUpstreamPhysicalLinks(opId) ++ 
plan.getDownstreamPhysicalLinks(opId))
+          .filter(link => operators.contains(link.fromOpId))
+        val physicalOps = operators.map(plan.getOperator)
+        val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op =>
+          op.inputPorts.keys
+            .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = 
true))
+            .toSet ++
+            op.outputPorts.keys.map(outputPortId => GlobalPortIdentity(op.id, 
outputPortId)).toSet
+        }
+        Region(
+          id = RegionIdentity(idx.toLong),
+          physicalOps = physicalOps,
+          physicalLinks = links,
+          ports = ports,
+          resourceConfig = None,
+          cached = true
+        )
+    }
+  }
+
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..a4cc8eded7 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -40,16 +40,24 @@ class WorkflowScheduler(
     * Update the schedule to be executed, based on the given physicalPlan.
     */
   def updateSchedule(physicalPlan: PhysicalPlan): Unit = {
-    // generate a schedule using a region plan generator.
-    val (generatedSchedule, updatedPhysicalPlan) =
-      // CostBasedRegionPlanGenerator considers costs to try to find an 
optimal plan.
-      new CostBasedScheduleGenerator(
-        workflowContext,
-        physicalPlan,
-        actorId
-      ).generate()
-    this.schedule = generatedSchedule
-    this.physicalPlan = updatedPhysicalPlan
+    // Keep the full immutable physical plan on scheduler/controller side.
+    this.physicalPlan = physicalPlan
+    val preScheduling = CacheReusePreSchedulingStep.prepare(
+      workflowContext,
+      physicalPlan
+    )
+    val planningWorkflowContext = new WorkflowContext(
+      workflowContext.workflowId,
+      workflowContext.executionId,
+      preScheduling.planningWorkflowSettings
+    )
+    this.schedule = new CostBasedScheduleGenerator(
+      planningWorkflowContext,
+      preScheduling.planningPhysicalPlan,
+      actorId,
+      preScheduling.planningHints,
+      preScheduling.leadingRegions
+    ).generate()._1
   }
 
   def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index fc4a54c0f9..19ef60c91c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -30,7 +30,6 @@ import 
org.apache.texera.amber.engine.architecture.scheduling.config.{
   ResourceConfig
 }
 import org.apache.texera.amber.engine.common.AmberLogging
-import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde
 import org.jgrapht.Graph
 import org.jgrapht.alg.connectivity.BiconnectivityInspector
 import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph}
@@ -48,7 +47,9 @@ import scala.util.{Failure, Success, Try}
 class CostBasedScheduleGenerator(
     workflowContext: WorkflowContext,
     initialPhysicalPlan: PhysicalPlan,
-    val actorId: ActorVirtualIdentity
+    val actorId: ActorVirtualIdentity,
+    planningHints: PreSchedulingHints = PreSchedulingHints(),
+    leadingRegions: Set[Region] = Set.empty
 ) extends ScheduleGenerator(
       workflowContext,
       initialPhysicalPlan
@@ -63,19 +64,6 @@ class CostBasedScheduleGenerator(
       numStatesExplored: Int = 0
   )
 
-  /**
-    * Deterministic cache-aware propagation result under Assumption III 
(forced cache use).
-    */
-  private case class RequirednessAnalysis(
-      requiredSeedPorts: Set[GlobalPortIdentity],
-      requiredOutputPorts: Set[GlobalPortIdentity],
-      executeOperators: Set[PhysicalOpIdentity],
-      freshRequiredOutputPorts: Set[GlobalPortIdentity],
-      cacheHitRequiredOutputPorts: Set[GlobalPortIdentity],
-      freshRequiredLinks: Set[PhysicalLink],
-      cacheFedLinks: Set[PhysicalLink]
-  )
-
   private val costEstimator =
     new DefaultCostEstimator(
       workflowContext = workflowContext,
@@ -83,36 +71,6 @@ class CostBasedScheduleGenerator(
       actorId = actorId
     )
 
-  private val cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] =
-    workflowContext.workflowSettings.cachedOutputs.map {
-      case (serializedId, cachedOutput) =>
-        GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
-    }
-
-  /**
-    * Search-time planning bindings applied during region construction.
-    *
-    * @param visibleFreshOutputPorts visible/sink output ports that must be 
freshly materialized.
-    * @param reuseOnlyOutputsByPort output ports that are required but must be 
served from cache (no rematerialization).
-    * @param cacheFedInputUrisByPort input ports pre-bound to cached upstream 
URIs.
-    */
-  private case class PlanningBindings(
-      visibleFreshOutputPorts: Set[GlobalPortIdentity],
-      reuseOnlyOutputsByPort: Map[GlobalPortIdentity, CachedOutput],
-      cacheFedInputUrisByPort: Map[GlobalPortIdentity, List[URI]]
-  )
-
-  private object PlanningBindings {
-    val empty: PlanningBindings = PlanningBindings(
-      visibleFreshOutputPorts = Set.empty,
-      reuseOnlyOutputsByPort = Map.empty,
-      cacheFedInputUrisByPort = Map.empty
-    )
-  }
-
-  private var activePlanningPlan: PhysicalPlan = initialPhysicalPlan
-  private var activePlanningBindings: PlanningBindings = PlanningBindings.empty
-
   private case class CostEstimatorMemoKey(
       physicalOpIds: Set[PhysicalOpIdentity],
       physicalLinkIds: Set[PhysicalLink],
@@ -126,269 +84,102 @@ class CostBasedScheduleGenerator(
 
   def generate(): (Schedule, PhysicalPlan) = {
     val startTime = System.nanoTime()
-    val analysis = analyzeRequiredness(initialPhysicalPlan)
-
-    val executeRegionDAG = new DirectedAcyclicGraph[Region, 
RegionLink](classOf[RegionLink])
-    if (analysis.executeOperators.nonEmpty) {
-      val residualPlan = buildResidualPlan(
-        initialPhysicalPlan,
-        analysis.executeOperators,
-        analysis.freshRequiredLinks
-      )
-      val residualBindings = buildResidualPlanningBindings(residualPlan, 
analysis)
-      val searchResult = searchOnPlan(residualPlan, residualBindings)
-      searchResult.regionDAG.vertexSet().forEach(region => 
executeRegionDAG.addVertex(region))
-      searchResult.regionDAG
-        .edgeSet()
-        .forEach(edge =>
-          executeRegionDAG.addEdge(
-            searchResult.regionDAG.getEdgeSource(edge),
-            searchResult.regionDAG.getEdgeTarget(edge),
-            edge
-          )
-        )
-    }
-
-    val skipRegions = buildSkipRegions(initialPhysicalPlan, analysis)
-    val finalRegionPlan = buildFinalRegionPlan(executeRegionDAG, skipRegions)
+    val regionDAG = createRegionDAG()
     val totalRPGTime = System.nanoTime() - startTime
-    val schedule = generateScheduleFromRegionPlan(finalRegionPlan)
+    val regionPlan = RegionPlan(
+      regions = regionDAG.iterator().asScala.toSet,
+      regionLinks = regionDAG.edgeSet().asScala.toSet
+    )
+    val executeOnlySchedule = generateScheduleFromRegionPlan(regionPlan)
+    val schedule =
+      if (leadingRegions.nonEmpty) 
composeWithLeadingRegions(executeOnlySchedule, leadingRegions)
+      else executeOnlySchedule
     logger.info(
       s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, total RPG time: " +
         s"${totalRPGTime / 1e6} ms."
     )
     (
       schedule,
-      initialPhysicalPlan
+      physicalPlan
     )
   }
 
   /**
-    * Computes deterministic requiredness and execute/skip decisions under 
Assumption III (forced cache use).
+    * Merge externally-prepared leading regions (e.g., skipped cached regions) 
ahead of the schedule
+    * generated by Pasta over the planning physical plan.
     */
-  private def analyzeRequiredness(plan: PhysicalPlan): RequirednessAnalysis = {
-    val requiredSeeds =
-      workflowContext.workflowSettings.outputPortsNeedingStorage
-        .filter(pid => plan.operators.exists(_.id == pid.opId))
-
-    val requiredOutputPorts = mutable.Set[GlobalPortIdentity]()
-    requiredOutputPorts ++= requiredSeeds
-    val executeOperators = mutable.Set[PhysicalOpIdentity]()
-
-    var changed = true
-    while (changed) {
-      changed = false
-      plan.topologicalIterator().foreach { opId =>
-        val op = plan.getOperator(opId)
-        val requiredOutputsOfOp =
-          op.outputPorts.keys
-            .map(portId => GlobalPortIdentity(op.id, portId))
-            .filter(requiredOutputPorts.contains)
-            .toSet
-        if (requiredOutputsOfOp.nonEmpty) {
-          val hasCacheMiss = requiredOutputsOfOp.exists(pid => 
!cachedOutputsByPort.contains(pid))
-          if (hasCacheMiss) {
-            if (!executeOperators.contains(opId)) {
-              executeOperators += opId
-              changed = true
-            }
-            plan.getUpstreamPhysicalLinks(opId).foreach { link =>
-              val upstreamOutput = GlobalPortIdentity(link.fromOpId, 
link.fromPortId)
-              if (!requiredOutputPorts.contains(upstreamOutput)) {
-                requiredOutputPorts += upstreamOutput
-                changed = true
-              }
-            }
-          }
-        }
-      }
-    }
-
-    val freshRequiredOutputPorts =
-      requiredOutputPorts.filter(pid => 
!cachedOutputsByPort.contains(pid)).toSet
-    val cacheHitRequiredOutputPorts =
-      requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet
-    val linksIntoExecute = plan.links.filter(link => 
executeOperators.contains(link.toOpId))
-    val freshRequiredLinks = linksIntoExecute
-      .filter(link =>
-        freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
-      )
-      .toSet
-    val cacheFedLinks = linksIntoExecute
-      .filter(link =>
-        cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, 
link.fromPortId))
-      )
-      .toSet
-
-    RequirednessAnalysis(
-      requiredSeedPorts = requiredSeeds,
-      requiredOutputPorts = requiredOutputPorts.toSet,
-      executeOperators = executeOperators.toSet,
-      freshRequiredOutputPorts = freshRequiredOutputPorts,
-      cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts,
-      freshRequiredLinks = freshRequiredLinks,
-      cacheFedLinks = cacheFedLinks
-    )
-  }
-
-  /**
-    * Builds a residual plan containing only execute operators and 
fresh-required dependencies.
-    */
-  private def buildResidualPlan(
-      plan: PhysicalPlan,
-      executeOperators: Set[PhysicalOpIdentity],
-      freshRequiredLinks: Set[PhysicalLink]
-  ): PhysicalPlan = {
-    val linksToRemove = plan.links.diff(freshRequiredLinks)
-    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
-    linksPrunedPlan.getSubPlan(executeOperators)
-  }
-
-  /**
-    * Builds the search-time binding context for residual Pasta planning.
-    * All bindings are prepared before search and consumed directly by 
createRegions.
-    */
-  private def buildResidualPlanningBindings(
-      residualPlan: PhysicalPlan,
-      analysis: RequirednessAnalysis
-  ): PlanningBindings = {
-    val residualOps = residualPlan.operators.map(_.id).toSet
-
-    val visibleFreshOutputPorts = analysis.requiredSeedPorts
-      .intersect(analysis.freshRequiredOutputPorts)
-      .filter(pid => residualOps.contains(pid.opId))
-
-    val reuseOnlyOutputsByPort = analysis.cacheHitRequiredOutputPorts
-      .filter(pid => residualOps.contains(pid.opId))
-      .flatMap(pid => cachedOutputsByPort.get(pid).map(cached => pid -> 
cached))
-      .toMap
-
-    val cacheFedInputUrisByPort = analysis.cacheFedLinks
-      .filter(link => residualOps.contains(link.toOpId))
-      .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) {
-        case (acc, link) =>
-          val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input 
= true)
-          val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId)
-          cachedOutputsByPort.get(outputPort) match {
-            case Some(cached) =>
-              val uris = acc.getOrElse(inputPort, List.empty) :+ 
cached.resultUri
-              acc.updated(inputPort, uris)
-            case None =>
-              acc
-          }
-      }
-
-    PlanningBindings(
-      visibleFreshOutputPorts = visibleFreshOutputPorts,
-      reuseOnlyOutputsByPort = reuseOnlyOutputsByPort,
-      cacheFedInputUrisByPort = cacheFedInputUrisByPort
-    )
-  }
+  private def composeWithLeadingRegions(
+      executeOnlySchedule: Schedule,
+      leadingRegions: Set[Region]
+  ): Schedule = {
+    val executeLevels = drainScheduleLevels(executeOnlySchedule)
+    val leadingOrdered = leadingRegions.toSeq.sortBy(_.id.id)
+    val executeOrdered = executeLevels.flatten.toSet.toSeq.sortBy((region: 
Region) => region.id.id)
+    val orderedRegions = leadingOrdered ++ executeOrdered
+    val remappedRegionByRegion = orderedRegions.zipWithIndex.map {
+      case (region, idx) =>
+        region -> region.copy(id = RegionIdentity(idx.toLong))
+    }.toMap
 
-  /**
-    * Runs the original Pasta search on a provided plan with precomputed 
planning bindings.
-    */
-  private def searchOnPlan(
-      plan: PhysicalPlan,
-      planningBindings: PlanningBindings
-  ): SearchResult = {
-    val oldPlan = activePlanningPlan
-    val oldBindings = activePlanningBindings
-    try {
-      activePlanningPlan = plan
-      activePlanningBindings = planningBindings
-      costEstimatorMemoization.clear()
-      runSearchWithTimeout()
-    } finally {
-      activePlanningPlan = oldPlan
-      activePlanningBindings = oldBindings
+    val levelSets = mutable.Map.empty[Int, Set[Region]]
+    var nextLevel = 0
+    if (leadingOrdered.nonEmpty) {
+      levelSets(nextLevel) = leadingOrdered.map(remappedRegionByRegion).toSet
+      nextLevel += 1
     }
-  }
 
-  /**
-    * Builds skip regions over operators excluded from residual Pasta planning.
-    */
-  private def buildSkipRegions(
-      plan: PhysicalPlan,
-      analysis: RequirednessAnalysis
-  ): Set[Region] = {
-    val skipOperators = 
plan.operators.map(_.id).diff(analysis.executeOperators)
-    if (skipOperators.isEmpty) {
-      return Set.empty
-    }
-    val skipInternalLinks = plan.links
-      .filter(link => skipOperators.contains(link.fromOpId) && 
skipOperators.contains(link.toOpId))
-    val linksToRemove = plan.links.diff(skipInternalLinks)
-    val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => 
acc.removeLink(link))
-    val skipPlan = linksPrunedPlan.getSubPlan(skipOperators)
-    val skipRegionSkeletons = createRegions(skipPlan, Set.empty)
-    skipRegionSkeletons.map { region =>
-      val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts
-        .filter(pid => region.physicalOps.exists(_.id == pid.opId))
-        .flatMap { outputPort =>
-          cachedOutputsByPort.get(outputPort).map { cached =>
-            outputPort -> OutputPortConfig(
-              storageURI = cached.resultUri,
-              cachedTupleCount = cached.tupleCount,
-              materialize = false
-            )
-          }
-        }
-        .toMap
-      region.copy(
-        cached = true,
-        resourceConfig = Some(ResourceConfig(portConfigs = 
reuseOnlyOutputConfigs))
-      )
+    executeLevels.foreach { level =>
+      if (level.nonEmpty) {
+        levelSets(nextLevel) = level.map(remappedRegionByRegion)
+        nextLevel += 1
+      }
     }
+    Schedule(levelSets.toMap)
   }
 
-  /**
-    * Produces the final full region plan by combining skip regions and 
execute regions
-    * (from residual Pasta), then reindexing region IDs with skipped regions 
first.
-    */
-  private def buildFinalRegionPlan(
-      executeRegionDAG: DirectedAcyclicGraph[Region, RegionLink],
-      skipRegions: Set[Region]
-  ): RegionPlan = {
-    val executeRegions = executeRegionDAG.vertexSet().asScala.toSet
-    val executeLinks = executeRegionDAG.edgeSet().asScala.toSet
-    val allRegions = executeRegions ++ skipRegions
-    val orderedRegions = allRegions.toSeq.sortBy(region =>
-      (if (skipRegions.contains(region)) 0 else 1, region.id.id)
-    )
-    val remappedRegionByRegion = orderedRegions.zipWithIndex.map {
-      case (region, idx) => region -> region.copy(id = 
RegionIdentity(idx.toLong))
-    }.toMap
-    val executeRegionById = executeRegions.map(region => region.id -> 
region).toMap
-
-    val remappedRegions = remappedRegionByRegion.values.toSet
-    val remappedExecuteLinks = executeLinks.map { link =>
-      val fromRegion = executeRegionById(link.fromRegionId)
-      val toRegion = executeRegionById(link.toRegionId)
-      RegionLink(
-        remappedRegionByRegion(fromRegion).id,
-        remappedRegionByRegion(toRegion).id
-      )
+  private def drainScheduleLevels(schedule: Schedule): Vector[Set[Region]] = {
+    val levels = mutable.ArrayBuffer.empty[Set[Region]]
+    while (schedule.hasNext) {
+      levels += schedule.next()
     }
-    RegionPlan(remappedRegions, remappedExecuteLinks)
+    levels.toVector
   }
 
   /**
-    * Partitions a physical plan into Regions and assigns search-time port 
bindings in two passes.
+    * Partitions a physical plan into Regions and assigns storage URIs in two 
passes.
     *
-    * Pass 1 assigns output bindings for:
-    * - outputs of materialized edges in the current search state,
-    * - visible required outputs that must be freshly produced,
-    * - reuse-only required outputs that must bind to cached URIs.
+    * <p><strong>Overview</strong></p>
+    * <ol>
+    *   <li><strong>Region construction:</strong>
+    *     Remove all materialized edges from the DAG and compute undirected 
connected
+    *     components. The resulting “Region Graph” may contain directed 
cycles.</li>
+    *   <li><strong>Pass 1 – Output URIs:</strong>
+    *     For each Region, allocate storage URIs on every output port of 
materialized edges.</li>
+    *   <li><strong>Pass 2 – Input URIs:</strong>
+    *     Re-traverse the same Regions and attach reader URIs on input ports 
using
+    *     the URIs created in Pass 1.</li>
+    * </ol>
     *
-    * Pass 2 builds input bindings by wiring:
-    * - materialized-edge reader URIs from pass 1,
-    * - cache-fed input URIs precomputed during requiredness analysis.
+    * <p><strong>Why two passes?</strong></p>
+    * <ul>
+    *   <li>Potential directed cycles in the Region Graph makes a topological
+    *       traversal of regions inpossible.</li>
+    *   <li>To ensure every output URI exists before its corresponding reader 
is assigned,
+    *       and avoiding “reader before writer” holes, two passes are 
required.</li>
+    * </ul>
+    *
+    * @param physicalPlan the original physical plan (without materializations)
+    * @param matEdges     edges to be materialized (including blocking edges)
+    * @return a set of `Region`s whose `ResourceConfig` contains only `URI`s 
for `PortConfig`s
+    *         (`Partitioning` to be assigned later in `ResourceAllocator`; see 
`IntermediateInputPortConfig`.)
     */
   private def createRegions(
       physicalPlan: PhysicalPlan,
       matEdges: Set[PhysicalLink]
   ): Set[Region] = {
+
+    // Pass 0 – remove materialized edges and create connected components
+
     val matEdgesRemovedDAG: PhysicalPlan = 
matEdges.foldLeft(physicalPlan)(_.removeLink(_))
 
     val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] =
@@ -396,8 +187,12 @@ class CostBasedScheduleGenerator(
         matEdgesRemovedDAG.dag
       ).getConnectedComponents.asScala.toSet
 
-    val regionsWithOutputBindings: Set[Region] = 
connectedComponents.zipWithIndex.map {
+    // Pass 1 – build Regions only output-port storage URIs
+
+    val regionsWithOnlyOutputPortURIs: Set[Region] = 
connectedComponents.zipWithIndex.map {
       case (connectedSubDAG, idx) =>
+        // Operators and intra‑region pipelined links
+
         val operators: Set[PhysicalOpIdentity] = 
connectedSubDAG.vertexSet().asScala.toSet
 
         val links: Set[PhysicalLink] = operators
@@ -410,6 +205,41 @@ class CostBasedScheduleGenerator(
 
         val physicalOps: Set[PhysicalOp] = 
operators.map(physicalPlan.getOperator)
 
+        // Frontend-specified ports that need to be materailized (output ports 
of "eye-icon" physicalOps)
+        val outputPortIdsToViewResult: Set[GlobalPortIdentity] =
+          workflowContext.workflowSettings.outputPortsNeedingStorage
+            .filter(pid => operators.contains(pid.opId))
+
+        // Contains both frontend-specified and scheduler-decided ports that 
require materailizations.
+        val overrideOutputPortIds: Set[GlobalPortIdentity] =
+          planningHints.outputPortConfigOverrides.keySet.filter(pid => 
operators.contains(pid.opId))
+
+        val outputPortIdsNeedingStorage: Set[GlobalPortIdentity] =
+          matEdges
+            .filter(e => operators.contains(e.fromOpId))
+            .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) ++
+            outputPortIdsToViewResult ++
+            overrideOutputPortIds
+
+        // Allocate an URI for each of these output ports
+        val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
+          outputPortIdsNeedingStorage.map { gpid =>
+            val outputConfig = 
planningHints.outputPortConfigOverrides.getOrElse(
+              gpid,
+              OutputPortConfig(
+                createResultURI(
+                  workflowId = workflowContext.workflowId,
+                  executionId = workflowContext.executionId,
+                  globalPortId = gpid
+                )
+              )
+            )
+            gpid -> outputConfig
+          }.toMap
+
+        val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
+
+        // Enumerate all ports belonging to the Region
         val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op =>
           op.inputPorts.keys
             .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = 
true))
@@ -418,102 +248,87 @@ class CostBasedScheduleGenerator(
             .toSet
         }
 
-        val outputPortIdsFromMatEdges = matEdges
-          .filter(edge => operators.contains(edge.fromOpId))
-          .map(edge => GlobalPortIdentity(edge.fromOpId, edge.fromPortId))
-        val visibleFreshOutputPortIds = 
activePlanningBindings.visibleFreshOutputPorts
-          .filter(portId => operators.contains(portId.opId))
-        val reuseOnlyOutputPortIds = 
activePlanningBindings.reuseOnlyOutputsByPort.keySet
-          .filter(portId => operators.contains(portId.opId))
-        val outputPortIdsNeedingBindings =
-          outputPortIdsFromMatEdges ++ visibleFreshOutputPortIds ++ 
reuseOnlyOutputPortIds
-
-        val outputPortConfigs = outputPortIdsNeedingBindings.map { 
outputPortId =>
-          activePlanningBindings.reuseOnlyOutputsByPort.get(outputPortId) 
match {
-            case Some(cachedOutput) =>
-              outputPortId -> OutputPortConfig(
-                storageURI = cachedOutput.resultUri,
-                cachedTupleCount = cachedOutput.tupleCount,
-                materialize = false
-              )
-            case None =>
-              outputPortId -> OutputPortConfig(
-                storageURI = createResultURI(
-                  workflowId = workflowContext.workflowId,
-                  executionId = workflowContext.executionId,
-                  globalPortId = outputPortId
-                ),
-                cachedTupleCount = None,
-                materialize = true
-              )
-          }
-        }.toMap
-
-        val resourceConfig =
-          if (outputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
outputPortConfigs))
-          else None
-
+        // Build the Region skeleton (no input‑port URIs yet)
         Region(
           id = RegionIdentity(idx),
           physicalOps = physicalOps,
           physicalLinks = links,
           ports = ports,
-          resourceConfig = resourceConfig,
-          cached = false
+          resourceConfig = Some(resourceConfig)
         )
     }
 
-    val regionByOperator = regionsWithOutputBindings
-      .flatMap(region => region.getOperators.map(op => op.id -> region))
-      .toMap
-
-    regionsWithOutputBindings.map { region =>
-      val inputUrisFromMatEdges = matEdges
-        .filter(edge => region.physicalOps.exists(_.id == edge.toOpId))
-        .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) {
-          case (acc, matEdge) =>
-            val globalInputPortId =
-              GlobalPortIdentity(matEdge.toOpId, matEdge.toPortId, input = 
true)
-            val globalOutputPortId = GlobalPortIdentity(matEdge.fromOpId, 
matEdge.fromPortId)
-            val inputReaderUri = regionByOperator(matEdge.fromOpId)
-              .resourceConfig
-              .get
-              .portConfigs(globalOutputPortId)
-              .storageURIs
-              .head
+    // Collect writer‑side configs so we can look them up in Pass 2
+    val allOutputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
+      regionsWithOnlyOutputPortURIs
+        .flatMap(_.resourceConfig) // Seq[ResourceConfig]
+        .flatMap(_.portConfigs.collect { // PortConfig → OutputPortConfig
+          case (id, cfg: OutputPortConfig) => id -> cfg
+        })
+        .toMap
+
+    // Pass 2 – add input‑port storage configs (reader URIs)
+
+    regionsWithOnlyOutputPortURIs.map { existingRegion =>
+      // MatEdges that originally connected to the input ports of this region.
+      val relevantMatEdges: Set[PhysicalLink] = matEdges.filter { matEdge =>
+        existingRegion.getOperators.exists(_.id == matEdge.toOpId)
+      }
+
+      // Assign storage URIs to input ports of each materialized edge (each 
input port could have more than one URI)
+      val inputUrisFromMatEdges: Map[GlobalPortIdentity, List[URI]] =
+        relevantMatEdges
+          .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { (acc, link) =>
+            val globalOutputPortId = GlobalPortIdentity(link.fromOpId, 
link.fromPortId)
+            val globalInputPortId = GlobalPortIdentity(link.toOpId, 
link.toPortId, input = true)
+
+            // Writer‑side URI that must already exist thanks to Pass 1
+            val inputReaderURI = allOutputPortConfigs
+              .getOrElse(
+                globalOutputPortId,
+                throw new IllegalStateException(
+                  s"Materialization edge $link: attempting to assign a 
materialization " +
+                    s"reader URI for input port $globalInputPortId when " +
+                    s"the outout port $globalOutputPortId has not been 
assigned a URI yet."
+                )
+              )
+              .storageURI
+
+            // Group all available URIs of this input port together
             acc.updated(
               globalInputPortId,
-              acc.getOrElse(globalInputPortId, List.empty[URI]) :+ 
inputReaderUri
+              acc.getOrElse(globalInputPortId, List.empty[URI]) :+ 
inputReaderURI
             )
+          }
+
+      val inputUrisFromOverrides: Map[GlobalPortIdentity, List[URI]] =
+        planningHints.inputPortUriOverrides.filter { case (inputPortId, _) =>
+          existingRegion.getPorts.contains(inputPortId)
         }
 
-      val cacheFedInputUris = activePlanningBindings.cacheFedInputUrisByPort
-        .filter { case (inputPortId, _) => region.ports.contains(inputPortId) }
-        .toMap
+      val mergedInputUris: Map[GlobalPortIdentity, List[URI]] =
+        (inputUrisFromMatEdges.keySet ++ inputUrisFromOverrides.keySet).map { 
inputPortId =>
+          inputPortId -> (
+            inputUrisFromMatEdges.getOrElse(inputPortId, List.empty[URI]) ++
+              inputUrisFromOverrides.getOrElse(inputPortId, List.empty[URI])
+          )
+        }.toMap
 
-      val mergedInputUris = (inputUrisFromMatEdges.keySet ++ 
cacheFedInputUris.keySet)
-        .map { inputPortId =>
-          val uris =
-            inputUrisFromMatEdges.getOrElse(inputPortId, List.empty) ++
-              cacheFedInputUris.getOrElse(inputPortId, List.empty)
-          inputPortId -> uris
+      val inputPortConfigs: Map[GlobalPortIdentity, 
IntermediateInputPortConfig] =
+        mergedInputUris.map {
+          case (inputPortId, uris) =>
+            inputPortId -> IntermediateInputPortConfig(uris)
         }
-        .toMap
 
-      val inputPortConfigs = mergedInputUris.map {
-        case (inputPortId, uris) =>
-          inputPortId -> IntermediateInputPortConfig(uris)
+      val newResourceConfig: Option[ResourceConfig] = 
existingRegion.resourceConfig match {
+        case Some(existingConfig) =>
+          Some(ResourceConfig(portConfigs = existingConfig.portConfigs ++ 
inputPortConfigs))
+        case None =>
+          if (inputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
inputPortConfigs))
+          else None
       }
 
-      val existingPortConfigs = 
region.resourceConfig.map(_.portConfigs).getOrElse(Map.empty)
-      val newResourceConfig =
-        if (inputPortConfigs.nonEmpty || existingPortConfigs.nonEmpty) {
-          Some(ResourceConfig(portConfigs = existingPortConfigs ++ 
inputPortConfigs))
-        } else {
-          None
-        }
-
-      region.copy(resourceConfig = newResourceConfig)
+      existingRegion.copy(resourceConfig = newResourceConfig)
     }
   }
 
@@ -530,7 +345,7 @@ class CostBasedScheduleGenerator(
     val regionDAG = new DirectedAcyclicGraph[Region, 
RegionLink](classOf[RegionLink])
     val regionGraph = new DirectedPseudograph[Region, 
RegionLink](classOf[RegionLink])
     val opToRegionMap = new mutable.HashMap[PhysicalOpIdentity, Region]
-    createRegions(activePlanningPlan, matEdges).foreach(region => {
+    createRegions(physicalPlan, matEdges).foreach(region => {
       region.getOperators.foreach(op => opToRegionMap(op.id) = region)
       regionGraph.addVertex(region)
       regionDAG.addVertex(region)
@@ -547,13 +362,17 @@ class CostBasedScheduleGenerator(
           isAcyclic = false
       }
     })
-    if (isAcyclic) Left(regionDAG) else Right(regionGraph)
+    if (isAcyclic) Left(regionDAG)
+    else Right(regionGraph)
   }
 
   /**
-    * Runs Pasta search with timeout handling and returns the selected search 
result.
+    * Performs a search to generate a region DAG.
+    * Materializations are added only after the plan is determined to be 
schedulable.
+    *
+    * @return A region DAG.
     */
-  private def runSearchWithTimeout(): SearchResult = {
+  private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
     val searchResultFuture: Future[SearchResult] = Future {
       workflowContext.workflowSettings.executionMode match {
         case ExecutionMode.MATERIALIZED =>
@@ -587,7 +406,9 @@ class CostBasedScheduleGenerator(
       s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, search time: " +
         s"${searchResult.searchTimeNanoSeconds / 1e6} ms."
     )
-    searchResult
+
+    val regionDAG = searchResult.regionDAG
+    regionDAG
   }
 
   /**
@@ -602,17 +423,17 @@ class CostBasedScheduleGenerator(
     */
   def bottomUpSearch(
       globalSearch: Boolean = false,
-      oChains: Boolean = false,
-      oCleanEdges: Boolean = false,
+      oChains: Boolean = true,
+      oCleanEdges: Boolean = true,
       oEarlyStop: Boolean = true
   ): SearchResult = {
     val startTime = System.nanoTime()
     val originalNonBlockingEdges =
       if (oCleanEdges) {
-        activePlanningPlan.getNonBridgeNonBlockingLinks
+        physicalPlan.getNonBridgeNonBlockingLinks
       } else {
-        activePlanningPlan.links.diff(
-          activePlanningPlan.getBlockingAndDependeeLinks
+        physicalPlan.links.diff(
+          physicalPlan.getBlockingAndDependeeLinks
         )
       }
     // Queue to hold states to be explored, starting with the empty set
@@ -643,7 +464,7 @@ class CostBasedScheduleGenerator(
         }
         visited.add(currentState)
         tryConnectRegionDAG(
-          activePlanningPlan.getBlockingAndDependeeLinks ++ currentState
+          physicalPlan.getBlockingAndDependeeLinks ++ currentState
         ) match {
           case Left(regionDAG) =>
             updateOptimumIfApplicable(regionDAG)
@@ -674,12 +495,12 @@ class CostBasedScheduleGenerator(
         */
       def addNeighborStatesToFrontier(): Unit = {
         val allCurrentMaterializedEdges =
-          currentState ++ activePlanningPlan.getBlockingAndDependeeLinks
+          currentState ++ physicalPlan.getBlockingAndDependeeLinks
         // Generate and enqueue all neighbour states that haven't been visited
         var candidateEdges = originalNonBlockingEdges
           .diff(currentState)
         if (oChains) {
-          val edgesInChainWithMaterializedEdges = activePlanningPlan.maxChains
+          val edgesInChainWithMaterializedEdges = physicalPlan.maxChains
             .filter(chain => 
chain.intersect(allCurrentMaterializedEdges).nonEmpty)
             .flatten
           candidateEdges = candidateEdges.diff(
@@ -710,7 +531,7 @@ class CostBasedScheduleGenerator(
           if (filteredNeighborStates.nonEmpty) {
             val minCostNeighborState = 
filteredNeighborStates.minBy(neighborState =>
               tryConnectRegionDAG(
-                activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState
+                physicalPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
                   allocateResourcesAndEvaluateCost(regionDAG)
@@ -736,7 +557,7 @@ class CostBasedScheduleGenerator(
     val startTime = System.nanoTime()
 
     val (regionDAG, cost) =
-      tryConnectRegionDAG(activePlanningPlan.links) match {
+      tryConnectRegionDAG(physicalPlan.links) match {
         case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag))
         case Right(_) =>
           (
@@ -765,19 +586,19 @@ class CostBasedScheduleGenerator(
     */
   def topDownSearch(
       globalSearch: Boolean = false,
-      oChains: Boolean = false,
-      oCleanEdges: Boolean = false
+      oChains: Boolean = true,
+      oCleanEdges: Boolean = true
   ): SearchResult = {
     val startTime = System.nanoTime()
     // Starting from a state where all non-blocking edges are materialized
-    val originalSeedState = activePlanningPlan.links.diff(
-      activePlanningPlan.getBlockingAndDependeeLinks
+    val originalSeedState = physicalPlan.links.diff(
+      physicalPlan.getBlockingAndDependeeLinks
     )
 
     // Chain optimization: an edge in the same chain as a blocking edge should 
not be materialized
     val seedStateOptimizedByChainsIfApplicable = if (oChains) {
-      val edgesInChainWithBlockingEdge = activePlanningPlan.maxChains
-        .filter(chain => 
chain.intersect(activePlanningPlan.getBlockingAndDependeeLinks).nonEmpty)
+      val edgesInChainWithBlockingEdge = physicalPlan.maxChains
+        .filter(chain => 
chain.intersect(physicalPlan.getBlockingAndDependeeLinks).nonEmpty)
         .flatten
       originalSeedState.diff(edgesInChainWithBlockingEdge)
     } else {
@@ -786,7 +607,7 @@ class CostBasedScheduleGenerator(
 
     // Clean edge optimization: a clean edge should not be materialized
     val finalSeedState = if (oCleanEdges) {
-      
seedStateOptimizedByChainsIfApplicable.intersect(activePlanningPlan.getNonBridgeNonBlockingLinks)
+      
seedStateOptimizedByChainsIfApplicable.intersect(physicalPlan.getNonBridgeNonBlockingLinks)
     } else {
       seedStateOptimizedByChainsIfApplicable
     }
@@ -806,7 +627,7 @@ class CostBasedScheduleGenerator(
       val currentState = queue.dequeue()
       visited.add(currentState)
       tryConnectRegionDAG(
-        activePlanningPlan.getBlockingAndDependeeLinks ++ currentState
+        physicalPlan.getBlockingAndDependeeLinks ++ currentState
       ) match {
         case Left(regionDAG) =>
           updateOptimumIfApplicable(regionDAG)
@@ -849,7 +670,7 @@ class CostBasedScheduleGenerator(
           if (unvisitedNeighborStates.nonEmpty) {
             val minCostNeighborState = 
unvisitedNeighborStates.minBy(neighborState =>
               tryConnectRegionDAG(
-                activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState
+                physicalPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
                   allocateResourcesAndEvaluateCost(regionDAG)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
index a37c389606..d86101a1f0 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -19,17 +19,20 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.WorkflowContext
 import 
org.apache.texera.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
-import org.apache.texera.amber.engine.architecture.scheduling.config.{
-  InputPortConfig,
-  IntermediateInputPortConfig,
-  OutputPortConfig,
-  ResourceConfig
-}
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.ResourceConfig
 import 
org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator
 import org.apache.texera.amber.engine.common.AmberLogging
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.SqlServer.withTransaction
+import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, 
WORKFLOW_VERSION}
+
+import java.net.URI
+import scala.util.{Failure, Success, Try}
 
 /**
   * A cost estimator should estimate a cost of running a region under the 
given resource constraints as units.
@@ -62,27 +65,101 @@ class DefaultCostEstimator(
 ) extends CostEstimator
     with AmberLogging {
 
+  // Requires mysql database to retrieve execution statistics, otherwise use 
number of materialized ports as a default.
+  private val operatorEstimatedTimeOption = Try(
+    this.getOperatorExecutionTimeInSeconds(
+      this.workflowContext.workflowId.id
+    )
+  ) match {
+    case Failure(_)      => None
+    case Success(result) => result
+  }
+
+  operatorEstimatedTimeOption match {
+    case None =>
+      logger.info(
+        s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, " +
+          s"no past execution statistics available. Using number of 
materialized output ports as the cost. "
+      )
+    case Some(_) =>
+  }
+
   override def allocateResourcesAndEstimateCost(
       region: Region,
       resourceUnits: Int
   ): (ResourceConfig, Double) = {
     // Currently the dummy cost from resourceAllocator is discarded.
     val (resourceConfig, _) = resourceAllocator.allocate(region)
-    val cost =
-      if (region.cached) {
-        0.0
-      } else {
-        val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST
-        val writeCost = resourceConfig.portConfigs.values.collect {
-          case _: OutputPortConfig =>
-            0.5
-        }.sum
-        val readCost = resourceConfig.portConfigs.values.collect {
-          case _: InputPortConfig             => 0.5
-          case _: IntermediateInputPortConfig => 0.5
-        }.sum
-        opCost + writeCost + readCost
-      }
+    // We use a cost model that does not rely on the resource allocation.
+    // TODO: Once the ResourceAllocator actually calculates a cost, we can use 
its calculated cost.
+    val cost = this.operatorEstimatedTimeOption match {
+      case Some(operatorEstimatedTime) =>
+        // Use past statistics (wall-clock runtime). We use the execution time 
of the longest-running
+        // operator in each region to represent the region's execution time, 
and use the sum of all the regions'
+        // execution time as the wall-clock runtime of the workflow.
+        // This assumes a schedule is a total-order of the regions.
+        val opExecutionTimes = region.getOperators.map(op => {
+          operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, 
DEFAULT_OPERATOR_COST)
+        })
+        val longestRunningOpExecutionTime = opExecutionTimes.max
+        longestRunningOpExecutionTime
+      case None =>
+        // Without past statistics (e.g., first execution), we use number of 
ports needing storage as the cost.
+        // Each port needing storage has a portConfig.
+        // This is independent of the schedule / resource allocator.
+        resourceConfig.portConfigs.size
+    }
     (resourceConfig, cost)
   }
+
+  /**
+    * Retrieve the latest successful execution to get statistics to calculate 
costs in DefaultCostEstimator.
+    * Using the total control processing time plus data processing time of an 
operator as its cost.
+    * If no past statistics are available (e.g., first execution), return None.
+    */
+  private def getOperatorExecutionTimeInSeconds(
+      wid: Long
+  ): Option[Map[String, Double]] = {
+
+    val uriString: String = withTransaction(
+      SqlServer
+        .getInstance()
+        .createDSLContext()
+    ) { context =>
+      context
+        .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+        .from(WORKFLOW_EXECUTIONS)
+        .join(WORKFLOW_VERSION)
+        .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
+        .where(
+          WORKFLOW_VERSION.WID
+            .eq(wid.toInt)
+            .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
+        )
+        .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
+        .limit(1)
+        .fetchOneInto(classOf[String])
+    }
+
+    if (uriString == null || uriString.isEmpty) {
+      None
+    } else {
+      val uri: URI = new URI(uriString)
+      val document = DocumentFactory.openDocument(uri)
+
+      val maxStats = document._1
+        .get()
+        .foldLeft(Map.empty[String, Double]) { (acc, tuple) =>
+          val record = tuple.asInstanceOf[Tuple]
+          val operatorId = record.getField(0).asInstanceOf[String]
+          val dataProcessingTime = record.getField(6).asInstanceOf[Long]
+          val controlProcessingTime = record.getField(7).asInstanceOf[Long]
+          val currentMaxTime = acc.getOrElse(operatorId, 0.0)
+          val newTime = (dataProcessingTime + controlProcessingTime) / 1e9
+          acc + (operatorId -> Math.max(currentMaxTime, newTime))
+        }
+
+      if (maxStats.isEmpty) None else Some(maxStats)
+    }
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala
new file mode 100644
index 0000000000..3f73bae62f
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling
+
+import org.apache.texera.amber.core.workflow.GlobalPortIdentity
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.OutputPortConfig
+
+import java.net.URI
+
+/**
+  * Optional external planning hints for CostBasedScheduleGenerator.
+  *
+  * These hints are generic URI/config overrides and do not encode any 
cache-specific decisions.
+  * They are intended to be prepared by a pre-scheduling step and consumed 
during region creation
+  * before resource allocation/search evaluation.
+  *
+  * @param outputPortConfigOverrides output-port configs to override default 
writer URI allocation.
+  *                                  This can be used for reuse-only outputs 
(`materialize = false`).
+  * @param inputPortUriOverrides additional reader URIs for input ports, 
merged with scheduler-derived
+  *                              materialized-edge input URIs.
+  */
+case class PreSchedulingHints(
+    outputPortConfigOverrides: Map[GlobalPortIdentity, OutputPortConfig] = 
Map.empty,
+    inputPortUriOverrides: Map[GlobalPortIdentity, List[URI]] = Map.empty
+)
diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md
index ddf99413d6..7e4c27defd 100644
--- a/docs/operator-port-cache.md
+++ b/docs/operator-port-cache.md
@@ -9,7 +9,7 @@ Enable **incremental workflow execution** through deterministic 
cache reuse of o
 3. **Maintain correctness** via deterministic fingerprinting of upstream 
sub-DAGs
 4. **Preserve physical plan immutability**: skip/execute/cache/fresh are 
scheduler annotations, not plan mutations
 
-**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a 
two-step process in `CostBasedScheduleGenerator`: (1) deterministic 
requiredness propagation with forced cache use, then (2) original Pasta 
optimization on the residual fresh-required structure only.
+**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a 
two-step flow: (1) a controller pre-scheduling step computes deterministic 
requiredness and cache bindings, then (2) `CostBasedScheduleGenerator` (Pasta) 
runs on the residual fresh-required structure using those precomputed hints.
 
 **Future research goals**:
 
@@ -101,9 +101,12 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 - Convert to serialized form: `Map[String, CachedOutput]` (keyed by serialized 
`GlobalPortIdentity` to avoid Jackson map key deserialization issues)
 - Store in `WorkflowSettings.cachedOutputs` and pass to scheduler
 
-### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator)
+### 2. Scheduler Integration (Pre-Step + Pasta)
 
-**Location**: `CostBasedScheduleGenerator`
+**Location**:
+
+- Pre-step: `CacheReusePreSchedulingStep` (controller layer before scheduling)
+- Scheduler: `CostBasedScheduleGenerator` (original Pasta search on residual 
plan)
 
 **Inputs**:
 
@@ -113,7 +116,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 
 **MVP Scheduling Flow (Assumption III)**:
 
-1. **Deterministic requiredness propagation** (port/edge aware):
+1. **Controller pre-scheduling requiredness propagation** (port/edge aware):
    - Seed required outports with required sink + visible ports.
    - For each operator, if any required outport is cache miss, mark operator 
`Execute`; otherwise `Skip`.
    - For each `Execute` operator, mark all upstream outports on incoming edges 
as required.
@@ -121,11 +124,17 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage 
collection are research
 2. **Classify inputs of executing operators**:
    - `Cache-fed` if upstream required outport has cache hit.
    - `Fresh-required` if upstream required outport has cache miss.
-3. **Residual Pasta optimization**:
+3. **Residual planning construction**:
    - Build residual planning structure containing only `Execute` operators and 
`Fresh-required` dependencies.
+   - Build precomputed planning hints:
+     - output-port config overrides for reuse-only cache-hit required outputs 
(`materialize=false`),
+     - input-port URI overrides for cache-fed execute inputs.
+   - Build skipped (`ToSkip`) leading regions outside Pasta scope.
+4. **Residual Pasta optimization**:
    - Run original Pasta search/regioning/materialization on this residual 
structure.
    - Keep blocking-edge constraints within the residual scope.
-4. **Assemble final full schedule**:
+5. **Assemble final full schedule**:
+   - Prepend skipped regions before execute regions.
    - Include residual execute regions (`ToExecute`) and skipped regions 
(`ToSkip`) in one schedule.
    - Regions remain homogeneous (`cached=true/false`).
    - All URI bindings are fixed at planning time.
@@ -397,6 +406,7 @@ ExecutionCacheService ────→ upsertCachedOutput()     
OperatorPortCache
     - cache-hit required outputs are bound as reuse-only (`materialize=false`),
     - cache-fed inputs are pre-bound to cached URIs.
   - No post-search region re-annotation step; resource allocation remains in 
Pasta search (`allocateResourcesAndEvaluateCost`) and final execute-region 
input configs stay as `InputPortConfig`.
+  - Cache-aware orchestration is isolated in `CacheReusePreSchedulingStep` 
(controller layer before scheduling); `CostBasedScheduleGenerator` stays close 
to `main` and only consumes generic precomputed planning hints plus leading 
skipped regions.
 - **Runtime execution**: `RegionExecutionCoordinator` branches on 
`region.cached` flag:
   - ToSkip regions: `completeCachedRegion()` records cached operator metrics 
(numWorkers=0, processingTime=0) without creating workers, propagates cached 
URIs downstream
   - ToExecute regions: normal execution path

Reply via email to