This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 241b98c20284229e101f244f6232bd353f19e84d Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Dec 1 15:45:33 2025 -0800 feat(cache): reuse cached URIs in scheduler and short-circuit cached regions during execution --- .../controller/ControllerProcessor.scala | 3 +- .../WorkerExecutionCompletedHandler.scala | 8 +- .../scheduling/CostBasedScheduleGenerator.scala | 262 ++++++++++++--------- .../engine/architecture/scheduling/Region.scala | 3 +- .../scheduling/RegionExecutionCoordinator.scala | 103 ++++++-- .../scheduling/WorkflowExecutionCoordinator.scala | 72 ++++-- .../scheduling/config/PortConfig.scala | 3 +- .../web/service/ExecutionResultService.scala | 2 +- .../web/service/WorkflowExecutionService.scala | 2 +- 9 files changed, 290 insertions(+), 168 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala index a07066390e..0cd28e0ca6 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala @@ -47,7 +47,8 @@ class ControllerProcessor( () => this.workflowScheduler.getNextRegions, workflowExecution, controllerConfig, - asyncRPCClient + asyncRPCClient, + workflowContext.executionId ) private val initializer = new ControllerAsyncRPCHandlerInitializer(this) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala index b2c135471b..6d8b21dd91 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala @@ -59,12 +59,8 @@ trait WorkerExecutionCompletedHandler { Future .collect(Seq(statsRequest)) .flatMap(_ => { - // if entire workflow is completed, clean up - if (cp.workflowExecution.isCompleted) { - // after query result come back: send completed event, cleanup ,and kill workflow - sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) - cp.controllerTimerService.disableStatusUpdate() - } + // completion notification is handled by the scheduler when all regions finish + () }) EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index 8d7f5ea889..cb517d4125 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -30,6 +30,7 @@ import org.apache.amber.engine.architecture.scheduling.config.{ ResourceConfig } import org.apache.amber.engine.common.AmberLogging +import org.apache.amber.util.serde.GlobalPortIdentitySerde import org.jgrapht.Graph import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph} @@ -69,6 +70,12 @@ class CostBasedScheduleGenerator( actorId = actorId ) + private val cachedOutputsByPort + : Map[GlobalPortIdentity, CachedOutput] = workflowContext.workflowSettings.cachedOutputs.map { + case (serializedId, cachedOutput) => + GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput + } + private case class CostEstimatorMemoKey( physicalOpIds: Set[PhysicalOpIdentity], physicalLinkIds: Set[PhysicalLink], @@ -76,6 +83,134 @@ class CostBasedScheduleGenerator( resourceConfig: Option[ResourceConfig] ) + /** + * Classify regions as cached vs must-execute and rebuild port configs accordingly. + * A region is must-execute if it has a visible port without cache, or if it feeds a region + * that needs a materialization without cache (propagated upstream). + */ + private def annotateRegionsWithCacheInfo( + regionDAG: DirectedAcyclicGraph[Region, RegionLink], + matEdges: Set[PhysicalLink], + opToRegionMap: Map[PhysicalOpIdentity, Region] + ): Unit = { + val regions = regionDAG.vertexSet().asScala.toSet + + val needsByRegion = computePortsNeedingStorage(regions, matEdges, opToRegionMap) + val mustExecute = computeMustExecuteRegions(needsByRegion, matEdges, opToRegionMap) + val cachedRegions = regions.diff(mustExecute) + val outputConfigByPort = buildOutputPortConfigs(needsByRegion, cachedRegions) + val inputConfigByPort = buildInputPortConfigs(matEdges, outputConfigByPort) + + // Rebuild each region with refreshed port configs and cached flag. + regions.foreach { region => + val outputCfgs = needsByRegion(region).map(pid => pid -> outputConfigByPort(pid)).toMap + val inputCfgs = inputConfigByPort.filter { case (pid, _) => region.ports.contains(pid) } + val portConfigs = outputCfgs ++ inputCfgs + val newResourceConfig = + if (portConfigs.nonEmpty) Some(ResourceConfig(portConfigs = portConfigs)) + else region.resourceConfig + + val newRegion = region.copy( + resourceConfig = newResourceConfig, + cached = cachedRegions.contains(region) + ) + replaceVertex(regionDAG, region, newRegion) + } + } + + private def computePortsNeedingStorage( + regions: Set[Region], + matEdges: Set[PhysicalLink], + opToRegionMap: Map[PhysicalOpIdentity, Region] + ): Map[Region, Set[GlobalPortIdentity]] = { + // Ports that require materialization for this run: scheduler-imposed (matEdges) + UI-visible ports. + regions.map { region => + val fromMatEdges = matEdges + .filter(e => opToRegionMap(e.fromOpId) == region) + .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) + val visiblePorts = workflowContext.workflowSettings.outputPortsNeedingStorage + .filter(pid => region.physicalOps.exists(_.id == pid.opId)) + region -> (fromMatEdges ++ visiblePorts) + }.toMap + } + + private def computeMustExecuteRegions( + needsByRegion: Map[Region, Set[GlobalPortIdentity]], + matEdges: Set[PhysicalLink], + opToRegionMap: Map[PhysicalOpIdentity, Region] + ): Set[Region] = { + // Seed with regions that have any needed port lacking cache; then propagate upstream along materialized edges. + val mustExecute = mutable.Set[Region]() + needsByRegion.foreach { + case (region, needs) => + if (needs.exists(pid => !cachedOutputsByPort.contains(pid))) { + mustExecute += region + } + } + var changed = true + while (changed) { + changed = false + matEdges.foreach { e => + val fromRegion = opToRegionMap(e.fromOpId) + val toRegion = opToRegionMap(e.toOpId) + val outPort = GlobalPortIdentity(e.fromOpId, e.fromPortId) + if ( + mustExecute.contains(toRegion) && + !cachedOutputsByPort.contains(outPort) && + !mustExecute.contains(fromRegion) + ) { + mustExecute += fromRegion + changed = true + } + } + } + mustExecute.toSet + } + + private def buildOutputPortConfigs( + needsByRegion: Map[Region, Set[GlobalPortIdentity]], + cachedRegions: Set[Region] + ): Map[GlobalPortIdentity, OutputPortConfig] = { + // For cached regions, reuse cached URI + tuple count; otherwise allocate new URI with no cached count. + needsByRegion.flatMap { + case (region, ports) => + ports.map { gpid => + val (uri, cachedCount) = + if (cachedRegions.contains(region)) { + val cached = cachedOutputsByPort(gpid) + (cached.resultUri, cached.tupleCount) + } else { + ( + createResultURI( + workflowId = workflowContext.workflowId, + executionId = workflowContext.executionId, + globalPortId = gpid + ), + None + ) + } + gpid -> OutputPortConfig(uri, cachedCount) + } + } + } + + private def buildInputPortConfigs( + matEdges: Set[PhysicalLink], + outputConfigByPort: Map[GlobalPortIdentity, OutputPortConfig] + ): Map[GlobalPortIdentity, IntermediateInputPortConfig] = { + matEdges + .groupBy(e => GlobalPortIdentity(e.toOpId, e.toPortId, input = true)) + .map { + case (inputPid, links) => + val uris = links + .map(link => + outputConfigByPort(GlobalPortIdentity(link.fromOpId, link.fromPortId)).storageURI + ) + .toList + inputPid -> IntermediateInputPortConfig(uris) + } + } + private val costEstimatorMemoization : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] = new mutable.HashMap() @@ -100,39 +235,20 @@ class CostBasedScheduleGenerator( } /** - * Partitions a physical plan into Regions and assigns storage URIs in two passes. - * - * <p><strong>Overview</strong></p> - * <ol> - * <li><strong>Region construction:</strong> - * Remove all materialized edges from the DAG and compute undirected connected - * components. The resulting “Region Graph” may contain directed cycles.</li> - * <li><strong>Pass 1 – Output URIs:</strong> - * For each Region, allocate storage URIs on every output port of materialized edges.</li> - * <li><strong>Pass 2 – Input URIs:</strong> - * Re-traverse the same Regions and attach reader URIs on input ports using - * the URIs created in Pass 1.</li> - * </ol> - * - * <p><strong>Why two passes?</strong></p> - * <ul> - * <li>Potential directed cycles in the Region Graph makes a topological - * traversal of regions inpossible.</li> - * <li>To ensure every output URI exists before its corresponding reader is assigned, - * and avoiding “reader before writer” holes, two passes are required.</li> - * </ul> + * Partitions a physical plan into Regions (skeletons). Cache vs must-execute + * classification and URI assignment are applied later in annotateRegionsWithCacheInfo + * using the RegionDAG and materialization edges. * * @param physicalPlan the original physical plan (without materializations) * @param matEdges edges to be materialized (including blocking edges) - * @return a set of `Region`s whose `ResourceConfig` contains only `URI`s for `PortConfig`s - * (`Partitioning` to be assigned later in `ResourceAllocator`; see `IntermediateInputPortConfig`.) + * @return a set of `Region` skeletons (resourceConfig/cached filled later) */ private def createRegions( physicalPlan: PhysicalPlan, matEdges: Set[PhysicalLink] ): Set[Region] = { - // Pass 0 – remove materialized edges and create connected components + // remove materialized edges and create connected components val matEdgesRemovedDAG: PhysicalPlan = matEdges.foldLeft(physicalPlan)(_.removeLink(_)) @@ -141,9 +257,9 @@ class CostBasedScheduleGenerator( matEdgesRemovedDAG.dag ).getConnectedComponents.asScala.toSet - // Pass 1 – build Regions only output-port storage URIs + // build region skeletons with no materialization information - val regionsWithOnlyOutputPortURIs: Set[Region] = connectedComponents.zipWithIndex.map { + val regionSkeletons: Set[Region] = connectedComponents.zipWithIndex.map { case (connectedSubDAG, idx) => // Operators and intra‑region pipelined links @@ -159,31 +275,6 @@ class CostBasedScheduleGenerator( val physicalOps: Set[PhysicalOp] = operators.map(physicalPlan.getOperator) - // Frontend-specified ports that need to be materailized (output ports of "eye-icon" physicalOps) - val outputPortIdsToViewResult: Set[GlobalPortIdentity] = - workflowContext.workflowSettings.outputPortsNeedingStorage - .filter(pid => operators.contains(pid.opId)) - - // Contains both frontend-specified and scheduler-decided ports that require materailizations. - val outputPortIdsNeedingStorage: Set[GlobalPortIdentity] = - matEdges - .filter(e => operators.contains(e.fromOpId)) - .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) ++ - outputPortIdsToViewResult - - // Allocate an URI for each of these output ports - val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] = - outputPortIdsNeedingStorage.map { gpid => - val outputWriterURI = createResultURI( - workflowId = workflowContext.workflowId, - executionId = workflowContext.executionId, - globalPortId = gpid - ) - gpid -> OutputPortConfig(outputWriterURI) - }.toMap - - val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs) - // Enumerate all ports belonging to the Region val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op => op.inputPorts.keys @@ -193,73 +284,19 @@ class CostBasedScheduleGenerator( .toSet } - // Build the Region skeleton (no input‑port URIs yet) + // Build the Region skeleton; cache/resourceConfig to be populated after classification. Region( id = RegionIdentity(idx), physicalOps = physicalOps, physicalLinks = links, ports = ports, - resourceConfig = Some(resourceConfig) + resourceConfig = None, + cached = false ) } - // Collect writer‑side configs so we can look them up in Pass 2 - val allOutputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] = - regionsWithOnlyOutputPortURIs - .flatMap(_.resourceConfig) // Seq[ResourceConfig] - .flatMap(_.portConfigs.collect { // PortConfig → OutputPortConfig - case (id, cfg: OutputPortConfig) => id -> cfg - }) - .toMap - - // Pass 2 – add input‑port storage configs (reader URIs) - - regionsWithOnlyOutputPortURIs.map { existingRegion => - // MatEdges that originally connected to the input ports of this region. - val relevantMatEdges: Set[PhysicalLink] = matEdges.filter { matEdge => - existingRegion.getOperators.exists(_.id == matEdge.toOpId) - } - - // Assign storage URIs to input ports of each materialized edge (each input port could have more than one URI) - val inputPortConfigs: Map[GlobalPortIdentity, IntermediateInputPortConfig] = - relevantMatEdges - .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { (acc, link) => - val globalOutputPortId = GlobalPortIdentity(link.fromOpId, link.fromPortId) - val globalInputPortId = GlobalPortIdentity(link.toOpId, link.toPortId, input = true) - - // Writer‑side URI that must already exist thanks to Pass 1 - val inputReaderURI = allOutputPortConfigs - .getOrElse( - globalOutputPortId, - throw new IllegalStateException( - s"Materialization edge $link: attempting to assign a materialization " + - s"reader URI for input port $globalInputPortId when " + - s"the outout port $globalOutputPortId has not been assigned a URI yet." - ) - ) - .storageURI - - // Group all available URIs of this input port together - acc.updated( - globalInputPortId, - acc.getOrElse(globalInputPortId, List.empty[URI]) :+ inputReaderURI - ) - } - .map { - case (inputPortId, uris) => - inputPortId -> IntermediateInputPortConfig(uris) - } - - val newResourceConfig: Option[ResourceConfig] = existingRegion.resourceConfig match { - case Some(existingConfig) => - Some(ResourceConfig(portConfigs = existingConfig.portConfigs ++ inputPortConfigs)) - case None => - if (inputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = inputPortConfigs)) - else None - } - - existingRegion.copy(resourceConfig = newResourceConfig) - } + // Regions are returned as skeletons; cache/URI assignment happens in annotateRegionsWithCacheInfo. + regionSkeletons } /** @@ -292,7 +329,10 @@ class CostBasedScheduleGenerator( isAcyclic = false } }) - if (isAcyclic) Left(regionDAG) + if (isAcyclic) { + annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap) + Left(regionDAG) + } else Right(regionGraph) } diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala index d6096c9a29..b3e7137f81 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/Region.scala @@ -35,7 +35,8 @@ case class Region( physicalOps: Set[PhysicalOp], physicalLinks: Set[PhysicalLink], ports: Set[GlobalPortIdentity] = Set.empty, - resourceConfig: Option[ResourceConfig] = None + resourceConfig: Option[ResourceConfig] = None, + cached: Boolean = false ) { private val operators: Map[PhysicalOpIdentity, PhysicalOp] = diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index a83af49dde..54fd6ef703 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -25,31 +25,14 @@ import org.apache.amber.core.storage.DocumentFactory import org.apache.amber.core.storage.VFSURIFactory.decodeURI import org.apache.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp} -import org.apache.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService, - ExecutorDeployment -} -import org.apache.amber.engine.architecture.controller.execution.{ - OperatorExecution, - RegionExecution, - WorkflowExecution -} -import org.apache.amber.engine.architecture.controller.{ - ControllerConfig, - ExecutionStatsUpdate, - WorkerAssignmentUpdate -} +import org.apache.amber.engine.architecture.common.{AkkaActorRefMappingService, AkkaActorService, ExecutorDeployment} +import org.apache.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} +import org.apache.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate} import org.apache.amber.engine.architecture.rpc.controlcommands._ import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.amber.engine.architecture.scheduling.config.{ - InputPortConfig, - OperatorConfig, - OutputPortConfig, - ResourceConfig -} +import org.apache.amber.engine.architecture.scheduling.config.{InputPortConfig, OperatorConfig, OutputPortConfig, ResourceConfig, WorkerConfig} import org.apache.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.amber.engine.architecture.worker.statistics.WorkerState +import org.apache.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping, TupleMetrics, WorkerState, WorkerStatistics} import org.apache.amber.engine.common.AmberLogging import org.apache.amber.engine.common.FutureBijection._ import org.apache.amber.engine.common.rpc.AsyncRPCClient @@ -91,14 +74,13 @@ import scala.concurrent.duration.Duration class RegionExecutionCoordinator( region: Region, workflowExecution: WorkflowExecution, + executionId: org.apache.amber.core.virtualidentity.ExecutionIdentity, asyncRPCClient: AsyncRPCClient, controllerConfig: ControllerConfig, actorService: AkkaActorService, actorRefService: AkkaActorRefMappingService ) extends AmberLogging { - initRegionExecution() - private sealed trait RegionExecutionPhase private case object Unexecuted extends RegionExecutionPhase private case object ExecutingDependeePortsPhase extends RegionExecutionPhase @@ -109,6 +91,63 @@ class RegionExecutionCoordinator( Unexecuted ) + if (region.cached) { + completeCachedRegion() + } else { + initRegionExecution() + } + + private def completeCachedRegion(): Unit = { + val regionExecution = workflowExecution.getRegionExecution(region.id) + val resourceConfig = region.resourceConfig.getOrElse(ResourceConfig()) + region.getOperators.foreach { op => + val opExecution = regionExecution.initOperatorExecution(op.id) + val workerConfigs = resourceConfig.operatorConfigs + .get(op.id) + .map(_.workerConfigs) + .getOrElse(WorkerConfig.generateWorkerConfigs(op)) + workerConfigs.foreach { workerCfg => + val workerExecution = opExecution.initWorkerExecution(workerCfg.workerId) + op.inputPorts.keys.foreach(pid => workerExecution.getInputPortExecution(pid).setCompleted()) + op.outputPorts.keys.foreach(pid => workerExecution.getOutputPortExecution(pid).setCompleted()) + val outputMetrics = op.outputPorts.keys.map { pid => + val count = resourceConfig.portConfigs.collectFirst { + case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => + cfg.cachedTupleCount.getOrElse(0L) + }.getOrElse(0L) + PortTupleMetricsMapping(pid, TupleMetrics(count, 0L)) + }.toSeq + val inputMetrics = op.inputPorts.keys + .map(pid => PortTupleMetricsMapping(pid, TupleMetrics(0L, 0L))) + .toSeq + val stats = WorkerStatistics( + inputMetrics, + outputMetrics, + dataProcessingTime = 0L, + controlProcessingTime = 0L, + idleTime = 0L + ) + workerExecution.update(System.nanoTime(), WorkerState.COMPLETED, stats) + } + } + recordCachedOutputPortResults(resourceConfig) + asyncRPCClient.sendToClient( + ExecutionStatsUpdate(workflowExecution.getAllRegionExecutionsStats) + ) + setPhase(Completed) + } + + private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): Unit = { + resourceConfig.portConfigs.collect { case (gpid, cfg: OutputPortConfig) => + val storageUri = cfg.storageURI + WorkflowExecutionsResource.insertOperatorPortResultUri( + eid = executionId, + globalPortId = gpid, + uri = storageUri + ) + } + } + /** * Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the ports of this region are completed. @@ -225,6 +264,10 @@ class RegionExecutionCoordinator( } private def executeDependeePortPhase(): Future[Unit] = { + if (region.cached) { + // Cached region short-circuits all execution. + return Future.Unit + } setPhase(ExecutingDependeePortsPhase) if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) { // Skip to the next phase when there are no dependee input ports @@ -351,6 +394,9 @@ class RegionExecutionCoordinator( operators: Set[PhysicalOp], resourceConfig: ResourceConfig ): Future[Seq[EmptyReturn]] = { + if (region.cached) { + return Future.value(Seq.empty) + } Future .collect( operators @@ -459,6 +505,9 @@ class RegionExecutionCoordinator( } private def connectChannels(links: Set[PhysicalLink]): Future[Seq[EmptyReturn]] = { + if (region.cached) { + return Future.value(Seq.empty) + } Future.collect( links.map { link: PhysicalLink => asyncRPCClient.controllerInterface.linkWorkers( @@ -470,6 +519,9 @@ class RegionExecutionCoordinator( } private def openOperators(operators: Set[PhysicalOp]): Future[Seq[EmptyReturn]] = { + if (region.cached) { + return Future.value(Seq.empty) + } Future .collect( operators @@ -489,6 +541,9 @@ class RegionExecutionCoordinator( region: Region, isDependeePhase: Boolean ): Future[Seq[Unit]] = { + if (region.cached) { + return Future.value(Seq.empty) + } asyncRPCClient.sendToClient( ExecutionStatsUpdate( workflowExecution.getAllRegionExecutionsStats diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 48cbe3440c..72c575ddd3 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -23,7 +23,7 @@ import com.twitter.util.Future import com.typesafe.scalalogging.LazyLogging import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} import org.apache.amber.engine.architecture.common.{AkkaActorRefMappingService, AkkaActorService} -import org.apache.amber.engine.architecture.controller.ControllerConfig +import org.apache.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import org.apache.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.amber.engine.common.rpc.AsyncRPCClient @@ -33,7 +33,8 @@ class WorkflowExecutionCoordinator( getNextRegions: () => Set[Region], workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, - asyncRPCClient: AsyncRPCClient + asyncRPCClient: AsyncRPCClient, + executionId: org.apache.amber.core.virtualidentity.ExecutionIdentity ) extends LazyLogging { private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer() @@ -76,27 +77,54 @@ class WorkflowExecutionCoordinator( } // All existing regions are completed. Start the next region (if any). - Future - .collect({ - val nextRegions = getNextRegions() - executedRegions.append(nextRegions) - nextRegions - .map(region => { - workflowExecution.initRegionExecution(region) - regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator( - region, - workflowExecution, - asyncRPCClient, - controllerConfig, - actorService, - actorRefService + val nextRegions = getNextRegions() + if (nextRegions.isEmpty) { + if (regionExecutionCoordinators.values.forall(_.isCompleted)) { + asyncRPCClient.sendToClient( + ExecutionStateUpdate(workflowExecution.getState) + ) + } + Future.Unit + } else { + executedRegions.append(nextRegions) + val launches = nextRegions + .map(region => { + workflowExecution.initRegionExecution(region) + regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator( + region, + workflowExecution, + executionId, + asyncRPCClient, + controllerConfig, + actorService, + actorRefService + ) + regionExecutionCoordinators(region.id) + }) + .map(_.syncStatusAndTransitionRegionExecutionPhase()) + .toSeq + Future + .collect(launches) + .unit + .flatMap { _ => + if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { + Future.Unit + } else { + // All launched regions finished immediately (e.g., cached); proceed to next batch. + coordinateRegionExecutors(actorService) + } + } + .map { _ => + if ( + regionExecutionCoordinators.values.forall(_.isCompleted) && + workflowExecution.isCompleted + ) { + asyncRPCClient.sendToClient( + ExecutionStateUpdate(workflowExecution.getState) ) - regionExecutionCoordinators(region.id) - }) - .map(_.syncStatusAndTransitionRegionExecutionPhase()) - .toSeq - }) - .unit + } + } + } } def getRegionOfLink(link: PhysicalLink): Region = { diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala index e76651eb10..df230a6170 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/config/PortConfig.scala @@ -32,7 +32,8 @@ sealed trait PortConfig { } /** An output port requires exactly one materialization URI. */ -final case class OutputPortConfig(storageURI: URI) extends PortConfig { +final case class OutputPortConfig(storageURI: URI, cachedTupleCount: Option[Long] = None) + extends PortConfig { override val storageURIs: List[URI] = List(storageURI) } diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 246c4e4967..926b03b824 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -330,7 +330,7 @@ class ExecutionResultService( evt.state == COMPLETED || evt.state == FAILED || evt.state == KILLED || evt.state == TERMINATED ) { logger.info("Workflow execution terminated. Stop update results.") - if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) { + if (resultUpdateCancellable == null|| resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) { // immediately perform final update onResultUpdate(executionId, physicalPlan) } diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 3d50a9eb89..cf7f999d5f 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -179,7 +179,7 @@ class WorkflowExecutionService( }) .onSuccess(resp => executionStateStore.metadataStore.updateState(metadataStore => - if (metadataStore.state != FAILED) { + if (metadataStore.state != FAILED && metadataStore.state != COMPLETED) { updateWorkflowState(resp.workflowState, metadataStore) } else { metadataStore
