This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 241b98c20284229e101f244f6232bd353f19e84d
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Dec 1 15:45:33 2025 -0800

    feat(cache): reuse cached URIs in scheduler and short-circuit cached 
regions during execution
---
 .../controller/ControllerProcessor.scala           |   3 +-
 .../WorkerExecutionCompletedHandler.scala          |   8 +-
 .../scheduling/CostBasedScheduleGenerator.scala    | 262 ++++++++++++---------
 .../engine/architecture/scheduling/Region.scala    |   3 +-
 .../scheduling/RegionExecutionCoordinator.scala    | 103 ++++++--
 .../scheduling/WorkflowExecutionCoordinator.scala  |  72 ++++--
 .../scheduling/config/PortConfig.scala             |   3 +-
 .../web/service/ExecutionResultService.scala       |   2 +-
 .../web/service/WorkflowExecutionService.scala     |   2 +-
 9 files changed, 290 insertions(+), 168 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
index a07066390e..0cd28e0ca6 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -47,7 +47,8 @@ class ControllerProcessor(
     () => this.workflowScheduler.getNextRegions,
     workflowExecution,
     controllerConfig,
-    asyncRPCClient
+    asyncRPCClient,
+    workflowContext.executionId
   )
 
   private val initializer = new ControllerAsyncRPCHandlerInitializer(this)
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index b2c135471b..6d8b21dd91 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -59,12 +59,8 @@ trait WorkerExecutionCompletedHandler {
     Future
       .collect(Seq(statsRequest))
       .flatMap(_ => {
-        // if entire workflow is completed, clean up
-        if (cp.workflowExecution.isCompleted) {
-          // after query result come back: send completed event, cleanup ,and 
kill workflow
-          sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
-          cp.controllerTimerService.disableStatusUpdate()
-        }
+        // completion notification is handled by the scheduler when all 
regions finish
+        ()
       })
     EmptyReturn()
   }
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 8d7f5ea889..cb517d4125 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -30,6 +30,7 @@ import 
org.apache.amber.engine.architecture.scheduling.config.{
   ResourceConfig
 }
 import org.apache.amber.engine.common.AmberLogging
+import org.apache.amber.util.serde.GlobalPortIdentitySerde
 import org.jgrapht.Graph
 import org.jgrapht.alg.connectivity.BiconnectivityInspector
 import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph}
@@ -69,6 +70,12 @@ class CostBasedScheduleGenerator(
       actorId = actorId
     )
 
+  private val cachedOutputsByPort
+      : Map[GlobalPortIdentity, CachedOutput] = 
workflowContext.workflowSettings.cachedOutputs.map {
+    case (serializedId, cachedOutput) =>
+      GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
+  }
+
   private case class CostEstimatorMemoKey(
       physicalOpIds: Set[PhysicalOpIdentity],
       physicalLinkIds: Set[PhysicalLink],
@@ -76,6 +83,134 @@ class CostBasedScheduleGenerator(
       resourceConfig: Option[ResourceConfig]
   )
 
+  /**
+    * Classify regions as cached vs must-execute and rebuild port configs 
accordingly.
+    * A region is must-execute if it has a visible port without cache, or if 
it feeds a region
+    * that needs a materialization without cache (propagated upstream).
+    */
+  private def annotateRegionsWithCacheInfo(
+      regionDAG: DirectedAcyclicGraph[Region, RegionLink],
+      matEdges: Set[PhysicalLink],
+      opToRegionMap: Map[PhysicalOpIdentity, Region]
+  ): Unit = {
+    val regions = regionDAG.vertexSet().asScala.toSet
+
+    val needsByRegion = computePortsNeedingStorage(regions, matEdges, 
opToRegionMap)
+    val mustExecute = computeMustExecuteRegions(needsByRegion, matEdges, 
opToRegionMap)
+    val cachedRegions = regions.diff(mustExecute)
+    val outputConfigByPort = buildOutputPortConfigs(needsByRegion, 
cachedRegions)
+    val inputConfigByPort = buildInputPortConfigs(matEdges, outputConfigByPort)
+
+    // Rebuild each region with refreshed port configs and cached flag.
+    regions.foreach { region =>
+      val outputCfgs = needsByRegion(region).map(pid => pid -> 
outputConfigByPort(pid)).toMap
+      val inputCfgs = inputConfigByPort.filter { case (pid, _) => 
region.ports.contains(pid) }
+      val portConfigs = outputCfgs ++ inputCfgs
+      val newResourceConfig =
+        if (portConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
portConfigs))
+        else region.resourceConfig
+
+      val newRegion = region.copy(
+        resourceConfig = newResourceConfig,
+        cached = cachedRegions.contains(region)
+      )
+      replaceVertex(regionDAG, region, newRegion)
+    }
+  }
+
+  private def computePortsNeedingStorage(
+      regions: Set[Region],
+      matEdges: Set[PhysicalLink],
+      opToRegionMap: Map[PhysicalOpIdentity, Region]
+  ): Map[Region, Set[GlobalPortIdentity]] = {
+    // Ports that require materialization for this run: scheduler-imposed 
(matEdges) + UI-visible ports.
+    regions.map { region =>
+      val fromMatEdges = matEdges
+        .filter(e => opToRegionMap(e.fromOpId) == region)
+        .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId))
+      val visiblePorts = 
workflowContext.workflowSettings.outputPortsNeedingStorage
+        .filter(pid => region.physicalOps.exists(_.id == pid.opId))
+      region -> (fromMatEdges ++ visiblePorts)
+    }.toMap
+  }
+
+  private def computeMustExecuteRegions(
+      needsByRegion: Map[Region, Set[GlobalPortIdentity]],
+      matEdges: Set[PhysicalLink],
+      opToRegionMap: Map[PhysicalOpIdentity, Region]
+  ): Set[Region] = {
+    // Seed with regions that have any needed port lacking cache; then 
propagate upstream along materialized edges.
+    val mustExecute = mutable.Set[Region]()
+    needsByRegion.foreach {
+      case (region, needs) =>
+        if (needs.exists(pid => !cachedOutputsByPort.contains(pid))) {
+          mustExecute += region
+        }
+    }
+    var changed = true
+    while (changed) {
+      changed = false
+      matEdges.foreach { e =>
+        val fromRegion = opToRegionMap(e.fromOpId)
+        val toRegion = opToRegionMap(e.toOpId)
+        val outPort = GlobalPortIdentity(e.fromOpId, e.fromPortId)
+        if (
+          mustExecute.contains(toRegion) &&
+          !cachedOutputsByPort.contains(outPort) &&
+          !mustExecute.contains(fromRegion)
+        ) {
+          mustExecute += fromRegion
+          changed = true
+        }
+      }
+    }
+    mustExecute.toSet
+  }
+
+  private def buildOutputPortConfigs(
+      needsByRegion: Map[Region, Set[GlobalPortIdentity]],
+      cachedRegions: Set[Region]
+  ): Map[GlobalPortIdentity, OutputPortConfig] = {
+    // For cached regions, reuse cached URI + tuple count; otherwise allocate 
new URI with no cached count.
+    needsByRegion.flatMap {
+      case (region, ports) =>
+        ports.map { gpid =>
+          val (uri, cachedCount) =
+            if (cachedRegions.contains(region)) {
+              val cached = cachedOutputsByPort(gpid)
+              (cached.resultUri, cached.tupleCount)
+            } else {
+              (
+                createResultURI(
+                  workflowId = workflowContext.workflowId,
+                  executionId = workflowContext.executionId,
+                  globalPortId = gpid
+                ),
+                None
+              )
+            }
+          gpid -> OutputPortConfig(uri, cachedCount)
+        }
+    }
+  }
+
+  private def buildInputPortConfigs(
+      matEdges: Set[PhysicalLink],
+      outputConfigByPort: Map[GlobalPortIdentity, OutputPortConfig]
+  ): Map[GlobalPortIdentity, IntermediateInputPortConfig] = {
+    matEdges
+      .groupBy(e => GlobalPortIdentity(e.toOpId, e.toPortId, input = true))
+      .map {
+        case (inputPid, links) =>
+          val uris = links
+            .map(link =>
+              outputConfigByPort(GlobalPortIdentity(link.fromOpId, 
link.fromPortId)).storageURI
+            )
+            .toList
+          inputPid -> IntermediateInputPortConfig(uris)
+      }
+  }
+
   private val costEstimatorMemoization
       : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] =
     new mutable.HashMap()
@@ -100,39 +235,20 @@ class CostBasedScheduleGenerator(
   }
 
   /**
-    * Partitions a physical plan into Regions and assigns storage URIs in two 
passes.
-    *
-    * <p><strong>Overview</strong></p>
-    * <ol>
-    *   <li><strong>Region construction:</strong>
-    *     Remove all materialized edges from the DAG and compute undirected 
connected
-    *     components. The resulting “Region Graph” may contain directed 
cycles.</li>
-    *   <li><strong>Pass 1 – Output URIs:</strong>
-    *     For each Region, allocate storage URIs on every output port of 
materialized edges.</li>
-    *   <li><strong>Pass 2 – Input URIs:</strong>
-    *     Re-traverse the same Regions and attach reader URIs on input ports 
using
-    *     the URIs created in Pass 1.</li>
-    * </ol>
-    *
-    * <p><strong>Why two passes?</strong></p>
-    * <ul>
-    *   <li>Potential directed cycles in the Region Graph makes a topological
-    *       traversal of regions inpossible.</li>
-    *   <li>To ensure every output URI exists before its corresponding reader 
is assigned,
-    *       and avoiding “reader before writer” holes, two passes are 
required.</li>
-    * </ul>
+    * Partitions a physical plan into Regions (skeletons). Cache vs 
must-execute
+    * classification and URI assignment are applied later in 
annotateRegionsWithCacheInfo
+    * using the RegionDAG and materialization edges.
     *
     * @param physicalPlan the original physical plan (without materializations)
     * @param matEdges     edges to be materialized (including blocking edges)
-    * @return a set of `Region`s whose `ResourceConfig` contains only `URI`s 
for `PortConfig`s
-    *         (`Partitioning` to be assigned later in `ResourceAllocator`; see 
`IntermediateInputPortConfig`.)
+    * @return a set of `Region` skeletons (resourceConfig/cached filled later)
     */
   private def createRegions(
       physicalPlan: PhysicalPlan,
       matEdges: Set[PhysicalLink]
   ): Set[Region] = {
 
-    // Pass 0 – remove materialized edges and create connected components
+    // remove materialized edges and create connected components
 
     val matEdgesRemovedDAG: PhysicalPlan = 
matEdges.foldLeft(physicalPlan)(_.removeLink(_))
 
@@ -141,9 +257,9 @@ class CostBasedScheduleGenerator(
         matEdgesRemovedDAG.dag
       ).getConnectedComponents.asScala.toSet
 
-    // Pass 1 – build Regions only output-port storage URIs
+    //  build region skeletons with no materialization information
 
-    val regionsWithOnlyOutputPortURIs: Set[Region] = 
connectedComponents.zipWithIndex.map {
+    val regionSkeletons: Set[Region] = connectedComponents.zipWithIndex.map {
       case (connectedSubDAG, idx) =>
         // Operators and intra‑region pipelined links
 
@@ -159,31 +275,6 @@ class CostBasedScheduleGenerator(
 
         val physicalOps: Set[PhysicalOp] = 
operators.map(physicalPlan.getOperator)
 
-        // Frontend-specified ports that need to be materailized (output ports 
of "eye-icon" physicalOps)
-        val outputPortIdsToViewResult: Set[GlobalPortIdentity] =
-          workflowContext.workflowSettings.outputPortsNeedingStorage
-            .filter(pid => operators.contains(pid.opId))
-
-        // Contains both frontend-specified and scheduler-decided ports that 
require materailizations.
-        val outputPortIdsNeedingStorage: Set[GlobalPortIdentity] =
-          matEdges
-            .filter(e => operators.contains(e.fromOpId))
-            .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) ++
-            outputPortIdsToViewResult
-
-        // Allocate an URI for each of these output ports
-        val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
-          outputPortIdsNeedingStorage.map { gpid =>
-            val outputWriterURI = createResultURI(
-              workflowId = workflowContext.workflowId,
-              executionId = workflowContext.executionId,
-              globalPortId = gpid
-            )
-            gpid -> OutputPortConfig(outputWriterURI)
-          }.toMap
-
-        val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
-
         // Enumerate all ports belonging to the Region
         val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op =>
           op.inputPorts.keys
@@ -193,73 +284,19 @@ class CostBasedScheduleGenerator(
             .toSet
         }
 
-        // Build the Region skeleton (no input‑port URIs yet)
+        // Build the Region skeleton; cache/resourceConfig to be populated 
after classification.
         Region(
           id = RegionIdentity(idx),
           physicalOps = physicalOps,
           physicalLinks = links,
           ports = ports,
-          resourceConfig = Some(resourceConfig)
+          resourceConfig = None,
+          cached = false
         )
     }
 
-    // Collect writer‑side configs so we can look them up in Pass 2
-    val allOutputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
-      regionsWithOnlyOutputPortURIs
-        .flatMap(_.resourceConfig) // Seq[ResourceConfig]
-        .flatMap(_.portConfigs.collect { // PortConfig → OutputPortConfig
-          case (id, cfg: OutputPortConfig) => id -> cfg
-        })
-        .toMap
-
-    // Pass 2 – add input‑port storage configs (reader URIs)
-
-    regionsWithOnlyOutputPortURIs.map { existingRegion =>
-      // MatEdges that originally connected to the input ports of this region.
-      val relevantMatEdges: Set[PhysicalLink] = matEdges.filter { matEdge =>
-        existingRegion.getOperators.exists(_.id == matEdge.toOpId)
-      }
-
-      // Assign storage URIs to input ports of each materialized edge (each 
input port could have more than one URI)
-      val inputPortConfigs: Map[GlobalPortIdentity, 
IntermediateInputPortConfig] =
-        relevantMatEdges
-          .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { (acc, link) =>
-            val globalOutputPortId = GlobalPortIdentity(link.fromOpId, 
link.fromPortId)
-            val globalInputPortId = GlobalPortIdentity(link.toOpId, 
link.toPortId, input = true)
-
-            // Writer‑side URI that must already exist thanks to Pass 1
-            val inputReaderURI = allOutputPortConfigs
-              .getOrElse(
-                globalOutputPortId,
-                throw new IllegalStateException(
-                  s"Materialization edge $link: attempting to assign a 
materialization " +
-                    s"reader URI for input port $globalInputPortId when " +
-                    s"the outout port $globalOutputPortId has not been 
assigned a URI yet."
-                )
-              )
-              .storageURI
-
-            // Group all available URIs of this input port together
-            acc.updated(
-              globalInputPortId,
-              acc.getOrElse(globalInputPortId, List.empty[URI]) :+ 
inputReaderURI
-            )
-          }
-          .map {
-            case (inputPortId, uris) =>
-              inputPortId -> IntermediateInputPortConfig(uris)
-          }
-
-      val newResourceConfig: Option[ResourceConfig] = 
existingRegion.resourceConfig match {
-        case Some(existingConfig) =>
-          Some(ResourceConfig(portConfigs = existingConfig.portConfigs ++ 
inputPortConfigs))
-        case None =>
-          if (inputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = 
inputPortConfigs))
-          else None
-      }
-
-      existingRegion.copy(resourceConfig = newResourceConfig)
-    }
+    // Regions are returned as skeletons; cache/URI assignment happens in 
annotateRegionsWithCacheInfo.
+    regionSkeletons
   }
 
   /**
@@ -292,7 +329,10 @@ class CostBasedScheduleGenerator(
           isAcyclic = false
       }
     })
-    if (isAcyclic) Left(regionDAG)
+    if (isAcyclic) {
+      annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap)
+      Left(regionDAG)
+    }
     else Right(regionGraph)
   }
 
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala
index d6096c9a29..b3e7137f81 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala
@@ -35,7 +35,8 @@ case class Region(
     physicalOps: Set[PhysicalOp],
     physicalLinks: Set[PhysicalLink],
     ports: Set[GlobalPortIdentity] = Set.empty,
-    resourceConfig: Option[ResourceConfig] = None
+    resourceConfig: Option[ResourceConfig] = None,
+    cached: Boolean = false
 ) {
 
   private val operators: Map[PhysicalOpIdentity, PhysicalOp] =
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index a83af49dde..54fd6ef703 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -25,31 +25,14 @@ import org.apache.amber.core.storage.DocumentFactory
 import org.apache.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, 
PhysicalOp}
-import org.apache.amber.engine.architecture.common.{
-  AkkaActorRefMappingService,
-  AkkaActorService,
-  ExecutorDeployment
-}
-import org.apache.amber.engine.architecture.controller.execution.{
-  OperatorExecution,
-  RegionExecution,
-  WorkflowExecution
-}
-import org.apache.amber.engine.architecture.controller.{
-  ControllerConfig,
-  ExecutionStatsUpdate,
-  WorkerAssignmentUpdate
-}
+import 
org.apache.amber.engine.architecture.common.{AkkaActorRefMappingService, 
AkkaActorService, ExecutorDeployment}
+import 
org.apache.amber.engine.architecture.controller.execution.{OperatorExecution, 
RegionExecution, WorkflowExecution}
+import org.apache.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate}
 import org.apache.amber.engine.architecture.rpc.controlcommands._
 import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
-import org.apache.amber.engine.architecture.scheduling.config.{
-  InputPortConfig,
-  OperatorConfig,
-  OutputPortConfig,
-  ResourceConfig
-}
+import 
org.apache.amber.engine.architecture.scheduling.config.{InputPortConfig, 
OperatorConfig, OutputPortConfig, ResourceConfig, WorkerConfig}
 import 
org.apache.amber.engine.architecture.sendsemantics.partitionings.Partitioning
-import org.apache.amber.engine.architecture.worker.statistics.WorkerState
+import 
org.apache.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping,
 TupleMetrics, WorkerState, WorkerStatistics}
 import org.apache.amber.engine.common.AmberLogging
 import org.apache.amber.engine.common.FutureBijection._
 import org.apache.amber.engine.common.rpc.AsyncRPCClient
@@ -91,14 +74,13 @@ import scala.concurrent.duration.Duration
 class RegionExecutionCoordinator(
     region: Region,
     workflowExecution: WorkflowExecution,
+    executionId: org.apache.amber.core.virtualidentity.ExecutionIdentity,
     asyncRPCClient: AsyncRPCClient,
     controllerConfig: ControllerConfig,
     actorService: AkkaActorService,
     actorRefService: AkkaActorRefMappingService
 ) extends AmberLogging {
 
-  initRegionExecution()
-
   private sealed trait RegionExecutionPhase
   private case object Unexecuted extends RegionExecutionPhase
   private case object ExecutingDependeePortsPhase extends RegionExecutionPhase
@@ -109,6 +91,63 @@ class RegionExecutionCoordinator(
     Unexecuted
   )
 
+  if (region.cached) {
+    completeCachedRegion()
+  } else {
+    initRegionExecution()
+  }
+
+  private def completeCachedRegion(): Unit = {
+    val regionExecution = workflowExecution.getRegionExecution(region.id)
+    val resourceConfig = region.resourceConfig.getOrElse(ResourceConfig())
+    region.getOperators.foreach { op =>
+      val opExecution = regionExecution.initOperatorExecution(op.id)
+      val workerConfigs = resourceConfig.operatorConfigs
+        .get(op.id)
+        .map(_.workerConfigs)
+        .getOrElse(WorkerConfig.generateWorkerConfigs(op))
+      workerConfigs.foreach { workerCfg =>
+        val workerExecution = 
opExecution.initWorkerExecution(workerCfg.workerId)
+        op.inputPorts.keys.foreach(pid => 
workerExecution.getInputPortExecution(pid).setCompleted())
+        op.outputPorts.keys.foreach(pid => 
workerExecution.getOutputPortExecution(pid).setCompleted())
+        val outputMetrics = op.outputPorts.keys.map { pid =>
+          val count = resourceConfig.portConfigs.collectFirst {
+            case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
+              cfg.cachedTupleCount.getOrElse(0L)
+          }.getOrElse(0L)
+          PortTupleMetricsMapping(pid, TupleMetrics(count, 0L))
+        }.toSeq
+        val inputMetrics = op.inputPorts.keys
+          .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L)))
+          .toSeq
+        val stats = WorkerStatistics(
+          inputMetrics,
+          outputMetrics,
+          dataProcessingTime = 0L,
+          controlProcessingTime = 0L,
+          idleTime = 0L
+        )
+        workerExecution.update(System.nanoTime(), WorkerState.COMPLETED, stats)
+      }
+    }
+    recordCachedOutputPortResults(resourceConfig)
+    asyncRPCClient.sendToClient(
+      ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats)
+    )
+    setPhase(Completed)
+  }
+
+  private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): 
Unit = {
+    resourceConfig.portConfigs.collect { case (gpid, cfg: OutputPortConfig) =>
+      val storageUri = cfg.storageURI
+      WorkflowExecutionsResource.insertOperatorPortResultUri(
+        eid = executionId,
+        globalPortId = gpid,
+        uri = storageUri
+      )
+    }
+  }
+
   /**
     * Sync the status of `RegionExecution` and transition this coordinator's 
phase to `Completed` only when the
     * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the 
ports of this region are completed.
@@ -225,6 +264,10 @@ class RegionExecutionCoordinator(
     }
 
   private def executeDependeePortPhase(): Future[Unit] = {
+    if (region.cached) {
+      // Cached region short-circuits all execution.
+      return Future.Unit
+    }
     setPhase(ExecutingDependeePortsPhase)
     if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) {
       // Skip to the next phase when there are no dependee input ports
@@ -351,6 +394,9 @@ class RegionExecutionCoordinator(
       operators: Set[PhysicalOp],
       resourceConfig: ResourceConfig
   ): Future[Seq[EmptyReturn]] = {
+    if (region.cached) {
+      return Future.value(Seq.empty)
+    }
     Future
       .collect(
         operators
@@ -459,6 +505,9 @@ class RegionExecutionCoordinator(
   }
 
   private def connectChannels(links: Set[PhysicalLink]): 
Future[Seq[EmptyReturn]] = {
+    if (region.cached) {
+      return Future.value(Seq.empty)
+    }
     Future.collect(
       links.map { link: PhysicalLink =>
         asyncRPCClient.controllerInterface.linkWorkers(
@@ -470,6 +519,9 @@ class RegionExecutionCoordinator(
   }
 
   private def openOperators(operators: Set[PhysicalOp]): 
Future[Seq[EmptyReturn]] = {
+    if (region.cached) {
+      return Future.value(Seq.empty)
+    }
     Future
       .collect(
         operators
@@ -489,6 +541,9 @@ class RegionExecutionCoordinator(
       region: Region,
       isDependeePhase: Boolean
   ): Future[Seq[Unit]] = {
+    if (region.cached) {
+      return Future.value(Seq.empty)
+    }
     asyncRPCClient.sendToClient(
       ExecutionStatsUpdate(
         workflowExecution.getAllRegionExecutionsStats
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 48cbe3440c..72c575ddd3 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -23,7 +23,7 @@ import com.twitter.util.Future
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
 import 
org.apache.amber.engine.architecture.common.{AkkaActorRefMappingService, 
AkkaActorService}
-import org.apache.amber.engine.architecture.controller.ControllerConfig
+import org.apache.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate}
 import 
org.apache.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.amber.engine.common.rpc.AsyncRPCClient
 
@@ -33,7 +33,8 @@ class WorkflowExecutionCoordinator(
     getNextRegions: () => Set[Region],
     workflowExecution: WorkflowExecution,
     controllerConfig: ControllerConfig,
-    asyncRPCClient: AsyncRPCClient
+    asyncRPCClient: AsyncRPCClient,
+    executionId: org.apache.amber.core.virtualidentity.ExecutionIdentity
 ) extends LazyLogging {
 
   private val executedRegions: mutable.ListBuffer[Set[Region]] = 
mutable.ListBuffer()
@@ -76,27 +77,54 @@ class WorkflowExecutionCoordinator(
     }
 
     // All existing regions are completed. Start the next region (if any).
-    Future
-      .collect({
-        val nextRegions = getNextRegions()
-        executedRegions.append(nextRegions)
-        nextRegions
-          .map(region => {
-            workflowExecution.initRegionExecution(region)
-            regionExecutionCoordinators(region.id) = new 
RegionExecutionCoordinator(
-              region,
-              workflowExecution,
-              asyncRPCClient,
-              controllerConfig,
-              actorService,
-              actorRefService
+    val nextRegions = getNextRegions()
+    if (nextRegions.isEmpty) {
+      if (regionExecutionCoordinators.values.forall(_.isCompleted)) {
+        asyncRPCClient.sendToClient(
+          ExecutionStateUpdate(workflowExecution.getState)
+        )
+      }
+      Future.Unit
+    } else {
+      executedRegions.append(nextRegions)
+      val launches = nextRegions
+        .map(region => {
+          workflowExecution.initRegionExecution(region)
+          regionExecutionCoordinators(region.id) = new 
RegionExecutionCoordinator(
+            region,
+            workflowExecution,
+            executionId,
+            asyncRPCClient,
+            controllerConfig,
+            actorService,
+            actorRefService
+          )
+          regionExecutionCoordinators(region.id)
+        })
+        .map(_.syncStatusAndTransitionRegionExecutionPhase())
+        .toSeq
+      Future
+        .collect(launches)
+        .unit
+        .flatMap { _ =>
+          if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
+            Future.Unit
+          } else {
+            // All launched regions finished immediately (e.g., cached); 
proceed to next batch.
+            coordinateRegionExecutors(actorService)
+          }
+        }
+        .map { _ =>
+          if (
+            regionExecutionCoordinators.values.forall(_.isCompleted) &&
+            workflowExecution.isCompleted
+          ) {
+            asyncRPCClient.sendToClient(
+              ExecutionStateUpdate(workflowExecution.getState)
             )
-            regionExecutionCoordinators(region.id)
-          })
-          .map(_.syncStatusAndTransitionRegionExecutionPhase())
-          .toSeq
-      })
-      .unit
+          }
+        }
+    }
   }
 
   def getRegionOfLink(link: PhysicalLink): Region = {
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala
index e76651eb10..df230a6170 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala
@@ -32,7 +32,8 @@ sealed trait PortConfig {
 }
 
 /** An output port requires exactly one materialization URI. */
-final case class OutputPortConfig(storageURI: URI) extends PortConfig {
+final case class OutputPortConfig(storageURI: URI, cachedTupleCount: 
Option[Long] = None)
+    extends PortConfig {
   override val storageURIs: List[URI] = List(storageURI)
 }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
index 246c4e4967..926b03b824 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
@@ -330,7 +330,7 @@ class ExecutionResultService(
             evt.state == COMPLETED || evt.state == FAILED || evt.state == 
KILLED || evt.state == TERMINATED
           ) {
             logger.info("Workflow execution terminated. Stop update results.")
-            if (resultUpdateCancellable.cancel() || 
resultUpdateCancellable.isCancelled) {
+            if (resultUpdateCancellable == null|| 
resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
               // immediately perform final update
               onResultUpdate(executionId, physicalPlan)
             }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index 3d50a9eb89..cf7f999d5f 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -179,7 +179,7 @@ class WorkflowExecutionService(
       })
       .onSuccess(resp =>
         executionStateStore.metadataStore.updateState(metadataStore =>
-          if (metadataStore.state != FAILED) {
+          if (metadataStore.state != FAILED && metadataStore.state != 
COMPLETED) {
             updateWorkflowState(resp.workflowState, metadataStore)
           } else {
             metadataStore

Reply via email to