This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 0c0f2a7734bacd52a7a96bdf19ace75a66571ee5 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Feb 9 17:39:00 2026 -0800 feat(cache): seperate cache reuse logic outside of Pasta. --- .../controller/CacheReusePreSchedulingStep.scala | 315 +++++++++++ .../controller/WorkflowScheduler.scala | 28 +- .../scheduling/CostBasedScheduleGenerator.scala | 585 +++++++-------------- .../architecture/scheduling/CostEstimator.scala | 119 ++++- .../scheduling/PreSchedulingHints.scala | 42 ++ docs/operator-port-cache.md | 22 +- 6 files changed, 692 insertions(+), 419 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala new file mode 100644 index 0000000000..122eadf2da --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/CacheReusePreSchedulingStep.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller + +import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity +import org.apache.texera.amber.core.workflow.{ + CachedOutput, + GlobalPortIdentity, + PhysicalLink, + PhysicalPlan, + WorkflowContext, + WorkflowSettings +} +import org.apache.texera.amber.engine.architecture.scheduling.config.{OutputPortConfig, ResourceConfig} +import org.apache.texera.amber.engine.architecture.scheduling.{ + PreSchedulingHints, + Region, + RegionIdentity +} +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde +import org.jgrapht.Graph +import org.jgrapht.alg.connectivity.BiconnectivityInspector + +import java.net.URI +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Pre-scheduling cache step for forced-cache-reuse MVP semantics. + * + * This step runs before CostBasedScheduleGenerator and only prepares planning inputs: + * residual planning plan, workflow settings override, precomputed URI/config hints, and leading + * skipped regions. The final schedule is still generated by CostBasedScheduleGenerator. + */ +object CacheReusePreSchedulingStep { + + /** + * Output of the pre-scheduling phase consumed by CostBasedScheduleGenerator. + */ + case class CacheReusePreSchedulingResult( + planningPhysicalPlan: PhysicalPlan, + planningWorkflowSettings: WorkflowSettings, + planningHints: PreSchedulingHints, + leadingRegions: Set[Region] + ) + + /** + * Deterministic requiredness result under forced-cache semantics. + */ + private case class RequirednessAnalysis( + requiredSeedPorts: Set[GlobalPortIdentity], + executeOperators: Set[PhysicalOpIdentity], + freshRequiredOutputPorts: Set[GlobalPortIdentity], + cacheHitRequiredOutputPorts: Set[GlobalPortIdentity], + freshRequiredLinks: Set[PhysicalLink], + cacheFedLinks: Set[PhysicalLink] + ) + + /** + * Prepare cache-aware planning inputs for CostBasedScheduleGenerator. + */ + def prepare( + workflowContext: WorkflowContext, + physicalPlan: PhysicalPlan + ): CacheReusePreSchedulingResult = { + val cachedOutputsByPort = workflowContext.workflowSettings.cachedOutputs.map { + case (serializedId, cachedOutput) => + GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput + } + + if (cachedOutputsByPort.isEmpty) { + return CacheReusePreSchedulingResult( + planningPhysicalPlan = physicalPlan, + planningWorkflowSettings = workflowContext.workflowSettings, + planningHints = PreSchedulingHints(), + leadingRegions = Set.empty + ) + } + + val analysis = analyzeRequiredness(physicalPlan, workflowContext, cachedOutputsByPort) + val skipRegions = buildSkipRegions(physicalPlan, analysis, cachedOutputsByPort) + val residualPlan = + buildResidualPlan(physicalPlan, analysis.executeOperators, analysis.freshRequiredLinks) + val planningWorkflowSettings = workflowContext.workflowSettings.copy( + outputPortsNeedingStorage = + analysis.requiredSeedPorts.intersect(analysis.freshRequiredOutputPorts) + ) + val planningHints = buildPlanningHints(residualPlan, analysis, cachedOutputsByPort) + + CacheReusePreSchedulingResult( + planningPhysicalPlan = residualPlan, + planningWorkflowSettings = planningWorkflowSettings, + planningHints = planningHints, + leadingRegions = skipRegions + ) + } + + /** + * Backward requiredness propagation with forced cache reuse: + * - cache hit required outputs do not force execution, + * - cache miss required outputs force execution and upstream propagation. + */ + private def analyzeRequiredness( + plan: PhysicalPlan, + workflowContext: WorkflowContext, + cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] + ): RequirednessAnalysis = { + val requiredSeeds = + workflowContext.workflowSettings.outputPortsNeedingStorage + .filter(pid => plan.operators.exists(_.id == pid.opId)) + + val requiredOutputPorts = mutable.Set[GlobalPortIdentity]() + requiredOutputPorts ++= requiredSeeds + val executeOperators = mutable.Set[PhysicalOpIdentity]() + + var changed = true + while (changed) { + changed = false + plan.topologicalIterator().foreach { opId => + val op = plan.getOperator(opId) + val requiredOutputsOfOp = + op.outputPorts.keys + .map(portId => GlobalPortIdentity(op.id, portId)) + .filter(requiredOutputPorts.contains) + .toSet + if (requiredOutputsOfOp.nonEmpty) { + val hasCacheMiss = requiredOutputsOfOp.exists(pid => !cachedOutputsByPort.contains(pid)) + if (hasCacheMiss) { + if (!executeOperators.contains(opId)) { + executeOperators += opId + changed = true + } + plan.getUpstreamPhysicalLinks(opId).foreach { link => + val upstreamOutput = GlobalPortIdentity(link.fromOpId, link.fromPortId) + if (!requiredOutputPorts.contains(upstreamOutput)) { + requiredOutputPorts += upstreamOutput + changed = true + } + } + } + } + } + } + + val freshRequiredOutputPorts = + requiredOutputPorts.filter(pid => !cachedOutputsByPort.contains(pid)).toSet + val cacheHitRequiredOutputPorts = + requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet + + val linksIntoExecute = plan.links.filter(link => executeOperators.contains(link.toOpId)) + val freshRequiredLinks = linksIntoExecute + .filter(link => + freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) + ) + .toSet + val cacheFedLinks = linksIntoExecute + .filter(link => + cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) + ) + .toSet + + RequirednessAnalysis( + requiredSeedPorts = requiredSeeds, + executeOperators = executeOperators.toSet, + freshRequiredOutputPorts = freshRequiredOutputPorts, + cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts, + freshRequiredLinks = freshRequiredLinks, + cacheFedLinks = cacheFedLinks + ) + } + + /** + * Keep only execute operators and fresh-required dependencies. + */ + private def buildResidualPlan( + plan: PhysicalPlan, + executeOperators: Set[PhysicalOpIdentity], + freshRequiredLinks: Set[PhysicalLink] + ): PhysicalPlan = { + val linksToRemove = plan.links.diff(freshRequiredLinks) + val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) + linksPrunedPlan.getSubPlan(executeOperators) + } + + /** + * Prepare generic planning hints for CostBasedScheduleGenerator. + */ + private def buildPlanningHints( + residualPlan: PhysicalPlan, + analysis: RequirednessAnalysis, + cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] + ): PreSchedulingHints = { + val residualOps = residualPlan.operators.map(_.id).toSet + + val outputPortConfigOverrides = analysis.cacheHitRequiredOutputPorts + .filter(pid => residualOps.contains(pid.opId)) + .flatMap { pid => + cachedOutputsByPort.get(pid).map { cached => + pid -> OutputPortConfig( + storageURI = cached.resultUri, + cachedTupleCount = cached.tupleCount, + materialize = false + ) + } + } + .toMap + + val inputPortUriOverrides = analysis.cacheFedLinks + .filter(link => residualOps.contains(link.toOpId)) + .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { + case (acc, link) => + val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input = true) + val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId) + cachedOutputsByPort.get(outputPort) match { + case Some(cached) => + val uris = acc.getOrElse(inputPort, List.empty) :+ cached.resultUri + acc.updated(inputPort, uris) + case None => + acc + } + } + + PreSchedulingHints(outputPortConfigOverrides, inputPortUriOverrides) + } + + /** + * Build skipped regions (outside Pasta scope) with reuse-only output-port bindings. + */ + private def buildSkipRegions( + plan: PhysicalPlan, + analysis: RequirednessAnalysis, + cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] + ): Set[Region] = { + val skipOperators = plan.operators.map(_.id).diff(analysis.executeOperators) + if (skipOperators.isEmpty) { + return Set.empty + } + + val skipInternalLinks = plan.links + .filter(link => skipOperators.contains(link.fromOpId) && skipOperators.contains(link.toOpId)) + val linksToRemove = plan.links.diff(skipInternalLinks) + val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) + val skipPlan = linksPrunedPlan.getSubPlan(skipOperators) + + createConnectedRegions(skipPlan).map { region => + val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts + .filter(pid => region.physicalOps.exists(_.id == pid.opId)) + .flatMap { outputPort => + cachedOutputsByPort.get(outputPort).map { cached => + outputPort -> OutputPortConfig( + storageURI = cached.resultUri, + cachedTupleCount = cached.tupleCount, + materialize = false + ) + } + } + .toMap + region.copy( + cached = true, + resourceConfig = Some(ResourceConfig(portConfigs = reuseOnlyOutputConfigs)) + ) + } + } + + /** + * Region skeleton partitioning without materialized-edge cuts. + */ + private def createConnectedRegions(plan: PhysicalPlan): Set[Region] = { + val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] = + new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink]( + plan.dag + ).getConnectedComponents.asScala.toSet + + connectedComponents.zipWithIndex.map { + case (connectedSubDAG, idx) => + val operators: Set[PhysicalOpIdentity] = connectedSubDAG.vertexSet().asScala.toSet + val links: Set[PhysicalLink] = operators + .flatMap(opId => plan.getUpstreamPhysicalLinks(opId) ++ plan.getDownstreamPhysicalLinks(opId)) + .filter(link => operators.contains(link.fromOpId)) + val physicalOps = operators.map(plan.getOperator) + val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op => + op.inputPorts.keys + .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = true)) + .toSet ++ + op.outputPorts.keys.map(outputPortId => GlobalPortIdentity(op.id, outputPortId)).toSet + } + Region( + id = RegionIdentity(idx.toLong), + physicalOps = physicalOps, + physicalLinks = links, + ports = ports, + resourceConfig = None, + cached = true + ) + } + } + +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bf..a4cc8eded7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -40,16 +40,24 @@ class WorkflowScheduler( * Update the schedule to be executed, based on the given physicalPlan. */ def updateSchedule(physicalPlan: PhysicalPlan): Unit = { - // generate a schedule using a region plan generator. - val (generatedSchedule, updatedPhysicalPlan) = - // CostBasedRegionPlanGenerator considers costs to try to find an optimal plan. - new CostBasedScheduleGenerator( - workflowContext, - physicalPlan, - actorId - ).generate() - this.schedule = generatedSchedule - this.physicalPlan = updatedPhysicalPlan + // Keep the full immutable physical plan on scheduler/controller side. + this.physicalPlan = physicalPlan + val preScheduling = CacheReusePreSchedulingStep.prepare( + workflowContext, + physicalPlan + ) + val planningWorkflowContext = new WorkflowContext( + workflowContext.workflowId, + workflowContext.executionId, + preScheduling.planningWorkflowSettings + ) + this.schedule = new CostBasedScheduleGenerator( + planningWorkflowContext, + preScheduling.planningPhysicalPlan, + actorId, + preScheduling.planningHints, + preScheduling.leadingRegions + ).generate()._1 } def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index fc4a54c0f9..19ef60c91c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -30,7 +30,6 @@ import org.apache.texera.amber.engine.architecture.scheduling.config.{ ResourceConfig } import org.apache.texera.amber.engine.common.AmberLogging -import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde import org.jgrapht.Graph import org.jgrapht.alg.connectivity.BiconnectivityInspector import org.jgrapht.graph.{DirectedAcyclicGraph, DirectedPseudograph} @@ -48,7 +47,9 @@ import scala.util.{Failure, Success, Try} class CostBasedScheduleGenerator( workflowContext: WorkflowContext, initialPhysicalPlan: PhysicalPlan, - val actorId: ActorVirtualIdentity + val actorId: ActorVirtualIdentity, + planningHints: PreSchedulingHints = PreSchedulingHints(), + leadingRegions: Set[Region] = Set.empty ) extends ScheduleGenerator( workflowContext, initialPhysicalPlan @@ -63,19 +64,6 @@ class CostBasedScheduleGenerator( numStatesExplored: Int = 0 ) - /** - * Deterministic cache-aware propagation result under Assumption III (forced cache use). - */ - private case class RequirednessAnalysis( - requiredSeedPorts: Set[GlobalPortIdentity], - requiredOutputPorts: Set[GlobalPortIdentity], - executeOperators: Set[PhysicalOpIdentity], - freshRequiredOutputPorts: Set[GlobalPortIdentity], - cacheHitRequiredOutputPorts: Set[GlobalPortIdentity], - freshRequiredLinks: Set[PhysicalLink], - cacheFedLinks: Set[PhysicalLink] - ) - private val costEstimator = new DefaultCostEstimator( workflowContext = workflowContext, @@ -83,36 +71,6 @@ class CostBasedScheduleGenerator( actorId = actorId ) - private val cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] = - workflowContext.workflowSettings.cachedOutputs.map { - case (serializedId, cachedOutput) => - GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput - } - - /** - * Search-time planning bindings applied during region construction. - * - * @param visibleFreshOutputPorts visible/sink output ports that must be freshly materialized. - * @param reuseOnlyOutputsByPort output ports that are required but must be served from cache (no rematerialization). - * @param cacheFedInputUrisByPort input ports pre-bound to cached upstream URIs. - */ - private case class PlanningBindings( - visibleFreshOutputPorts: Set[GlobalPortIdentity], - reuseOnlyOutputsByPort: Map[GlobalPortIdentity, CachedOutput], - cacheFedInputUrisByPort: Map[GlobalPortIdentity, List[URI]] - ) - - private object PlanningBindings { - val empty: PlanningBindings = PlanningBindings( - visibleFreshOutputPorts = Set.empty, - reuseOnlyOutputsByPort = Map.empty, - cacheFedInputUrisByPort = Map.empty - ) - } - - private var activePlanningPlan: PhysicalPlan = initialPhysicalPlan - private var activePlanningBindings: PlanningBindings = PlanningBindings.empty - private case class CostEstimatorMemoKey( physicalOpIds: Set[PhysicalOpIdentity], physicalLinkIds: Set[PhysicalLink], @@ -126,269 +84,102 @@ class CostBasedScheduleGenerator( def generate(): (Schedule, PhysicalPlan) = { val startTime = System.nanoTime() - val analysis = analyzeRequiredness(initialPhysicalPlan) - - val executeRegionDAG = new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]) - if (analysis.executeOperators.nonEmpty) { - val residualPlan = buildResidualPlan( - initialPhysicalPlan, - analysis.executeOperators, - analysis.freshRequiredLinks - ) - val residualBindings = buildResidualPlanningBindings(residualPlan, analysis) - val searchResult = searchOnPlan(residualPlan, residualBindings) - searchResult.regionDAG.vertexSet().forEach(region => executeRegionDAG.addVertex(region)) - searchResult.regionDAG - .edgeSet() - .forEach(edge => - executeRegionDAG.addEdge( - searchResult.regionDAG.getEdgeSource(edge), - searchResult.regionDAG.getEdgeTarget(edge), - edge - ) - ) - } - - val skipRegions = buildSkipRegions(initialPhysicalPlan, analysis) - val finalRegionPlan = buildFinalRegionPlan(executeRegionDAG, skipRegions) + val regionDAG = createRegionDAG() val totalRPGTime = System.nanoTime() - startTime - val schedule = generateScheduleFromRegionPlan(finalRegionPlan) + val regionPlan = RegionPlan( + regions = regionDAG.iterator().asScala.toSet, + regionLinks = regionDAG.edgeSet().asScala.toSet + ) + val executeOnlySchedule = generateScheduleFromRegionPlan(regionPlan) + val schedule = + if (leadingRegions.nonEmpty) composeWithLeadingRegions(executeOnlySchedule, leadingRegions) + else executeOnlySchedule logger.info( s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, total RPG time: " + s"${totalRPGTime / 1e6} ms." ) ( schedule, - initialPhysicalPlan + physicalPlan ) } /** - * Computes deterministic requiredness and execute/skip decisions under Assumption III (forced cache use). + * Merge externally-prepared leading regions (e.g., skipped cached regions) ahead of the schedule + * generated by Pasta over the planning physical plan. */ - private def analyzeRequiredness(plan: PhysicalPlan): RequirednessAnalysis = { - val requiredSeeds = - workflowContext.workflowSettings.outputPortsNeedingStorage - .filter(pid => plan.operators.exists(_.id == pid.opId)) - - val requiredOutputPorts = mutable.Set[GlobalPortIdentity]() - requiredOutputPorts ++= requiredSeeds - val executeOperators = mutable.Set[PhysicalOpIdentity]() - - var changed = true - while (changed) { - changed = false - plan.topologicalIterator().foreach { opId => - val op = plan.getOperator(opId) - val requiredOutputsOfOp = - op.outputPorts.keys - .map(portId => GlobalPortIdentity(op.id, portId)) - .filter(requiredOutputPorts.contains) - .toSet - if (requiredOutputsOfOp.nonEmpty) { - val hasCacheMiss = requiredOutputsOfOp.exists(pid => !cachedOutputsByPort.contains(pid)) - if (hasCacheMiss) { - if (!executeOperators.contains(opId)) { - executeOperators += opId - changed = true - } - plan.getUpstreamPhysicalLinks(opId).foreach { link => - val upstreamOutput = GlobalPortIdentity(link.fromOpId, link.fromPortId) - if (!requiredOutputPorts.contains(upstreamOutput)) { - requiredOutputPorts += upstreamOutput - changed = true - } - } - } - } - } - } - - val freshRequiredOutputPorts = - requiredOutputPorts.filter(pid => !cachedOutputsByPort.contains(pid)).toSet - val cacheHitRequiredOutputPorts = - requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet - val linksIntoExecute = plan.links.filter(link => executeOperators.contains(link.toOpId)) - val freshRequiredLinks = linksIntoExecute - .filter(link => - freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) - ) - .toSet - val cacheFedLinks = linksIntoExecute - .filter(link => - cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) - ) - .toSet - - RequirednessAnalysis( - requiredSeedPorts = requiredSeeds, - requiredOutputPorts = requiredOutputPorts.toSet, - executeOperators = executeOperators.toSet, - freshRequiredOutputPorts = freshRequiredOutputPorts, - cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts, - freshRequiredLinks = freshRequiredLinks, - cacheFedLinks = cacheFedLinks - ) - } - - /** - * Builds a residual plan containing only execute operators and fresh-required dependencies. - */ - private def buildResidualPlan( - plan: PhysicalPlan, - executeOperators: Set[PhysicalOpIdentity], - freshRequiredLinks: Set[PhysicalLink] - ): PhysicalPlan = { - val linksToRemove = plan.links.diff(freshRequiredLinks) - val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) - linksPrunedPlan.getSubPlan(executeOperators) - } - - /** - * Builds the search-time binding context for residual Pasta planning. - * All bindings are prepared before search and consumed directly by createRegions. - */ - private def buildResidualPlanningBindings( - residualPlan: PhysicalPlan, - analysis: RequirednessAnalysis - ): PlanningBindings = { - val residualOps = residualPlan.operators.map(_.id).toSet - - val visibleFreshOutputPorts = analysis.requiredSeedPorts - .intersect(analysis.freshRequiredOutputPorts) - .filter(pid => residualOps.contains(pid.opId)) - - val reuseOnlyOutputsByPort = analysis.cacheHitRequiredOutputPorts - .filter(pid => residualOps.contains(pid.opId)) - .flatMap(pid => cachedOutputsByPort.get(pid).map(cached => pid -> cached)) - .toMap - - val cacheFedInputUrisByPort = analysis.cacheFedLinks - .filter(link => residualOps.contains(link.toOpId)) - .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { - case (acc, link) => - val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input = true) - val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId) - cachedOutputsByPort.get(outputPort) match { - case Some(cached) => - val uris = acc.getOrElse(inputPort, List.empty) :+ cached.resultUri - acc.updated(inputPort, uris) - case None => - acc - } - } - - PlanningBindings( - visibleFreshOutputPorts = visibleFreshOutputPorts, - reuseOnlyOutputsByPort = reuseOnlyOutputsByPort, - cacheFedInputUrisByPort = cacheFedInputUrisByPort - ) - } + private def composeWithLeadingRegions( + executeOnlySchedule: Schedule, + leadingRegions: Set[Region] + ): Schedule = { + val executeLevels = drainScheduleLevels(executeOnlySchedule) + val leadingOrdered = leadingRegions.toSeq.sortBy(_.id.id) + val executeOrdered = executeLevels.flatten.toSet.toSeq.sortBy((region: Region) => region.id.id) + val orderedRegions = leadingOrdered ++ executeOrdered + val remappedRegionByRegion = orderedRegions.zipWithIndex.map { + case (region, idx) => + region -> region.copy(id = RegionIdentity(idx.toLong)) + }.toMap - /** - * Runs the original Pasta search on a provided plan with precomputed planning bindings. - */ - private def searchOnPlan( - plan: PhysicalPlan, - planningBindings: PlanningBindings - ): SearchResult = { - val oldPlan = activePlanningPlan - val oldBindings = activePlanningBindings - try { - activePlanningPlan = plan - activePlanningBindings = planningBindings - costEstimatorMemoization.clear() - runSearchWithTimeout() - } finally { - activePlanningPlan = oldPlan - activePlanningBindings = oldBindings + val levelSets = mutable.Map.empty[Int, Set[Region]] + var nextLevel = 0 + if (leadingOrdered.nonEmpty) { + levelSets(nextLevel) = leadingOrdered.map(remappedRegionByRegion).toSet + nextLevel += 1 } - } - /** - * Builds skip regions over operators excluded from residual Pasta planning. - */ - private def buildSkipRegions( - plan: PhysicalPlan, - analysis: RequirednessAnalysis - ): Set[Region] = { - val skipOperators = plan.operators.map(_.id).diff(analysis.executeOperators) - if (skipOperators.isEmpty) { - return Set.empty - } - val skipInternalLinks = plan.links - .filter(link => skipOperators.contains(link.fromOpId) && skipOperators.contains(link.toOpId)) - val linksToRemove = plan.links.diff(skipInternalLinks) - val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) - val skipPlan = linksPrunedPlan.getSubPlan(skipOperators) - val skipRegionSkeletons = createRegions(skipPlan, Set.empty) - skipRegionSkeletons.map { region => - val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts - .filter(pid => region.physicalOps.exists(_.id == pid.opId)) - .flatMap { outputPort => - cachedOutputsByPort.get(outputPort).map { cached => - outputPort -> OutputPortConfig( - storageURI = cached.resultUri, - cachedTupleCount = cached.tupleCount, - materialize = false - ) - } - } - .toMap - region.copy( - cached = true, - resourceConfig = Some(ResourceConfig(portConfigs = reuseOnlyOutputConfigs)) - ) + executeLevels.foreach { level => + if (level.nonEmpty) { + levelSets(nextLevel) = level.map(remappedRegionByRegion) + nextLevel += 1 + } } + Schedule(levelSets.toMap) } - /** - * Produces the final full region plan by combining skip regions and execute regions - * (from residual Pasta), then reindexing region IDs with skipped regions first. - */ - private def buildFinalRegionPlan( - executeRegionDAG: DirectedAcyclicGraph[Region, RegionLink], - skipRegions: Set[Region] - ): RegionPlan = { - val executeRegions = executeRegionDAG.vertexSet().asScala.toSet - val executeLinks = executeRegionDAG.edgeSet().asScala.toSet - val allRegions = executeRegions ++ skipRegions - val orderedRegions = allRegions.toSeq.sortBy(region => - (if (skipRegions.contains(region)) 0 else 1, region.id.id) - ) - val remappedRegionByRegion = orderedRegions.zipWithIndex.map { - case (region, idx) => region -> region.copy(id = RegionIdentity(idx.toLong)) - }.toMap - val executeRegionById = executeRegions.map(region => region.id -> region).toMap - - val remappedRegions = remappedRegionByRegion.values.toSet - val remappedExecuteLinks = executeLinks.map { link => - val fromRegion = executeRegionById(link.fromRegionId) - val toRegion = executeRegionById(link.toRegionId) - RegionLink( - remappedRegionByRegion(fromRegion).id, - remappedRegionByRegion(toRegion).id - ) + private def drainScheduleLevels(schedule: Schedule): Vector[Set[Region]] = { + val levels = mutable.ArrayBuffer.empty[Set[Region]] + while (schedule.hasNext) { + levels += schedule.next() } - RegionPlan(remappedRegions, remappedExecuteLinks) + levels.toVector } /** - * Partitions a physical plan into Regions and assigns search-time port bindings in two passes. + * Partitions a physical plan into Regions and assigns storage URIs in two passes. * - * Pass 1 assigns output bindings for: - * - outputs of materialized edges in the current search state, - * - visible required outputs that must be freshly produced, - * - reuse-only required outputs that must bind to cached URIs. + * <p><strong>Overview</strong></p> + * <ol> + * <li><strong>Region construction:</strong> + * Remove all materialized edges from the DAG and compute undirected connected + * components. The resulting “Region Graph” may contain directed cycles.</li> + * <li><strong>Pass 1 – Output URIs:</strong> + * For each Region, allocate storage URIs on every output port of materialized edges.</li> + * <li><strong>Pass 2 – Input URIs:</strong> + * Re-traverse the same Regions and attach reader URIs on input ports using + * the URIs created in Pass 1.</li> + * </ol> * - * Pass 2 builds input bindings by wiring: - * - materialized-edge reader URIs from pass 1, - * - cache-fed input URIs precomputed during requiredness analysis. + * <p><strong>Why two passes?</strong></p> + * <ul> + * <li>Potential directed cycles in the Region Graph makes a topological + * traversal of regions inpossible.</li> + * <li>To ensure every output URI exists before its corresponding reader is assigned, + * and avoiding “reader before writer” holes, two passes are required.</li> + * </ul> + * + * @param physicalPlan the original physical plan (without materializations) + * @param matEdges edges to be materialized (including blocking edges) + * @return a set of `Region`s whose `ResourceConfig` contains only `URI`s for `PortConfig`s + * (`Partitioning` to be assigned later in `ResourceAllocator`; see `IntermediateInputPortConfig`.) */ private def createRegions( physicalPlan: PhysicalPlan, matEdges: Set[PhysicalLink] ): Set[Region] = { + + // Pass 0 – remove materialized edges and create connected components + val matEdgesRemovedDAG: PhysicalPlan = matEdges.foldLeft(physicalPlan)(_.removeLink(_)) val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] = @@ -396,8 +187,12 @@ class CostBasedScheduleGenerator( matEdgesRemovedDAG.dag ).getConnectedComponents.asScala.toSet - val regionsWithOutputBindings: Set[Region] = connectedComponents.zipWithIndex.map { + // Pass 1 – build Regions only output-port storage URIs + + val regionsWithOnlyOutputPortURIs: Set[Region] = connectedComponents.zipWithIndex.map { case (connectedSubDAG, idx) => + // Operators and intra‑region pipelined links + val operators: Set[PhysicalOpIdentity] = connectedSubDAG.vertexSet().asScala.toSet val links: Set[PhysicalLink] = operators @@ -410,6 +205,41 @@ class CostBasedScheduleGenerator( val physicalOps: Set[PhysicalOp] = operators.map(physicalPlan.getOperator) + // Frontend-specified ports that need to be materailized (output ports of "eye-icon" physicalOps) + val outputPortIdsToViewResult: Set[GlobalPortIdentity] = + workflowContext.workflowSettings.outputPortsNeedingStorage + .filter(pid => operators.contains(pid.opId)) + + // Contains both frontend-specified and scheduler-decided ports that require materailizations. + val overrideOutputPortIds: Set[GlobalPortIdentity] = + planningHints.outputPortConfigOverrides.keySet.filter(pid => operators.contains(pid.opId)) + + val outputPortIdsNeedingStorage: Set[GlobalPortIdentity] = + matEdges + .filter(e => operators.contains(e.fromOpId)) + .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) ++ + outputPortIdsToViewResult ++ + overrideOutputPortIds + + // Allocate an URI for each of these output ports + val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] = + outputPortIdsNeedingStorage.map { gpid => + val outputConfig = planningHints.outputPortConfigOverrides.getOrElse( + gpid, + OutputPortConfig( + createResultURI( + workflowId = workflowContext.workflowId, + executionId = workflowContext.executionId, + globalPortId = gpid + ) + ) + ) + gpid -> outputConfig + }.toMap + + val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs) + + // Enumerate all ports belonging to the Region val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op => op.inputPorts.keys .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = true)) @@ -418,102 +248,87 @@ class CostBasedScheduleGenerator( .toSet } - val outputPortIdsFromMatEdges = matEdges - .filter(edge => operators.contains(edge.fromOpId)) - .map(edge => GlobalPortIdentity(edge.fromOpId, edge.fromPortId)) - val visibleFreshOutputPortIds = activePlanningBindings.visibleFreshOutputPorts - .filter(portId => operators.contains(portId.opId)) - val reuseOnlyOutputPortIds = activePlanningBindings.reuseOnlyOutputsByPort.keySet - .filter(portId => operators.contains(portId.opId)) - val outputPortIdsNeedingBindings = - outputPortIdsFromMatEdges ++ visibleFreshOutputPortIds ++ reuseOnlyOutputPortIds - - val outputPortConfigs = outputPortIdsNeedingBindings.map { outputPortId => - activePlanningBindings.reuseOnlyOutputsByPort.get(outputPortId) match { - case Some(cachedOutput) => - outputPortId -> OutputPortConfig( - storageURI = cachedOutput.resultUri, - cachedTupleCount = cachedOutput.tupleCount, - materialize = false - ) - case None => - outputPortId -> OutputPortConfig( - storageURI = createResultURI( - workflowId = workflowContext.workflowId, - executionId = workflowContext.executionId, - globalPortId = outputPortId - ), - cachedTupleCount = None, - materialize = true - ) - } - }.toMap - - val resourceConfig = - if (outputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = outputPortConfigs)) - else None - + // Build the Region skeleton (no input‑port URIs yet) Region( id = RegionIdentity(idx), physicalOps = physicalOps, physicalLinks = links, ports = ports, - resourceConfig = resourceConfig, - cached = false + resourceConfig = Some(resourceConfig) ) } - val regionByOperator = regionsWithOutputBindings - .flatMap(region => region.getOperators.map(op => op.id -> region)) - .toMap - - regionsWithOutputBindings.map { region => - val inputUrisFromMatEdges = matEdges - .filter(edge => region.physicalOps.exists(_.id == edge.toOpId)) - .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { - case (acc, matEdge) => - val globalInputPortId = - GlobalPortIdentity(matEdge.toOpId, matEdge.toPortId, input = true) - val globalOutputPortId = GlobalPortIdentity(matEdge.fromOpId, matEdge.fromPortId) - val inputReaderUri = regionByOperator(matEdge.fromOpId) - .resourceConfig - .get - .portConfigs(globalOutputPortId) - .storageURIs - .head + // Collect writer‑side configs so we can look them up in Pass 2 + val allOutputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] = + regionsWithOnlyOutputPortURIs + .flatMap(_.resourceConfig) // Seq[ResourceConfig] + .flatMap(_.portConfigs.collect { // PortConfig → OutputPortConfig + case (id, cfg: OutputPortConfig) => id -> cfg + }) + .toMap + + // Pass 2 – add input‑port storage configs (reader URIs) + + regionsWithOnlyOutputPortURIs.map { existingRegion => + // MatEdges that originally connected to the input ports of this region. + val relevantMatEdges: Set[PhysicalLink] = matEdges.filter { matEdge => + existingRegion.getOperators.exists(_.id == matEdge.toOpId) + } + + // Assign storage URIs to input ports of each materialized edge (each input port could have more than one URI) + val inputUrisFromMatEdges: Map[GlobalPortIdentity, List[URI]] = + relevantMatEdges + .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { (acc, link) => + val globalOutputPortId = GlobalPortIdentity(link.fromOpId, link.fromPortId) + val globalInputPortId = GlobalPortIdentity(link.toOpId, link.toPortId, input = true) + + // Writer‑side URI that must already exist thanks to Pass 1 + val inputReaderURI = allOutputPortConfigs + .getOrElse( + globalOutputPortId, + throw new IllegalStateException( + s"Materialization edge $link: attempting to assign a materialization " + + s"reader URI for input port $globalInputPortId when " + + s"the outout port $globalOutputPortId has not been assigned a URI yet." + ) + ) + .storageURI + + // Group all available URIs of this input port together acc.updated( globalInputPortId, - acc.getOrElse(globalInputPortId, List.empty[URI]) :+ inputReaderUri + acc.getOrElse(globalInputPortId, List.empty[URI]) :+ inputReaderURI ) + } + + val inputUrisFromOverrides: Map[GlobalPortIdentity, List[URI]] = + planningHints.inputPortUriOverrides.filter { case (inputPortId, _) => + existingRegion.getPorts.contains(inputPortId) } - val cacheFedInputUris = activePlanningBindings.cacheFedInputUrisByPort - .filter { case (inputPortId, _) => region.ports.contains(inputPortId) } - .toMap + val mergedInputUris: Map[GlobalPortIdentity, List[URI]] = + (inputUrisFromMatEdges.keySet ++ inputUrisFromOverrides.keySet).map { inputPortId => + inputPortId -> ( + inputUrisFromMatEdges.getOrElse(inputPortId, List.empty[URI]) ++ + inputUrisFromOverrides.getOrElse(inputPortId, List.empty[URI]) + ) + }.toMap - val mergedInputUris = (inputUrisFromMatEdges.keySet ++ cacheFedInputUris.keySet) - .map { inputPortId => - val uris = - inputUrisFromMatEdges.getOrElse(inputPortId, List.empty) ++ - cacheFedInputUris.getOrElse(inputPortId, List.empty) - inputPortId -> uris + val inputPortConfigs: Map[GlobalPortIdentity, IntermediateInputPortConfig] = + mergedInputUris.map { + case (inputPortId, uris) => + inputPortId -> IntermediateInputPortConfig(uris) } - .toMap - val inputPortConfigs = mergedInputUris.map { - case (inputPortId, uris) => - inputPortId -> IntermediateInputPortConfig(uris) + val newResourceConfig: Option[ResourceConfig] = existingRegion.resourceConfig match { + case Some(existingConfig) => + Some(ResourceConfig(portConfigs = existingConfig.portConfigs ++ inputPortConfigs)) + case None => + if (inputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = inputPortConfigs)) + else None } - val existingPortConfigs = region.resourceConfig.map(_.portConfigs).getOrElse(Map.empty) - val newResourceConfig = - if (inputPortConfigs.nonEmpty || existingPortConfigs.nonEmpty) { - Some(ResourceConfig(portConfigs = existingPortConfigs ++ inputPortConfigs)) - } else { - None - } - - region.copy(resourceConfig = newResourceConfig) + existingRegion.copy(resourceConfig = newResourceConfig) } } @@ -530,7 +345,7 @@ class CostBasedScheduleGenerator( val regionDAG = new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]) val regionGraph = new DirectedPseudograph[Region, RegionLink](classOf[RegionLink]) val opToRegionMap = new mutable.HashMap[PhysicalOpIdentity, Region] - createRegions(activePlanningPlan, matEdges).foreach(region => { + createRegions(physicalPlan, matEdges).foreach(region => { region.getOperators.foreach(op => opToRegionMap(op.id) = region) regionGraph.addVertex(region) regionDAG.addVertex(region) @@ -547,13 +362,17 @@ class CostBasedScheduleGenerator( isAcyclic = false } }) - if (isAcyclic) Left(regionDAG) else Right(regionGraph) + if (isAcyclic) Left(regionDAG) + else Right(regionGraph) } /** - * Runs Pasta search with timeout handling and returns the selected search result. + * Performs a search to generate a region DAG. + * Materializations are added only after the plan is determined to be schedulable. + * + * @return A region DAG. */ - private def runSearchWithTimeout(): SearchResult = { + private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = { val searchResultFuture: Future[SearchResult] = Future { workflowContext.workflowSettings.executionMode match { case ExecutionMode.MATERIALIZED => @@ -587,7 +406,9 @@ class CostBasedScheduleGenerator( s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, search time: " + s"${searchResult.searchTimeNanoSeconds / 1e6} ms." ) - searchResult + + val regionDAG = searchResult.regionDAG + regionDAG } /** @@ -602,17 +423,17 @@ class CostBasedScheduleGenerator( */ def bottomUpSearch( globalSearch: Boolean = false, - oChains: Boolean = false, - oCleanEdges: Boolean = false, + oChains: Boolean = true, + oCleanEdges: Boolean = true, oEarlyStop: Boolean = true ): SearchResult = { val startTime = System.nanoTime() val originalNonBlockingEdges = if (oCleanEdges) { - activePlanningPlan.getNonBridgeNonBlockingLinks + physicalPlan.getNonBridgeNonBlockingLinks } else { - activePlanningPlan.links.diff( - activePlanningPlan.getBlockingAndDependeeLinks + physicalPlan.links.diff( + physicalPlan.getBlockingAndDependeeLinks ) } // Queue to hold states to be explored, starting with the empty set @@ -643,7 +464,7 @@ class CostBasedScheduleGenerator( } visited.add(currentState) tryConnectRegionDAG( - activePlanningPlan.getBlockingAndDependeeLinks ++ currentState + physicalPlan.getBlockingAndDependeeLinks ++ currentState ) match { case Left(regionDAG) => updateOptimumIfApplicable(regionDAG) @@ -674,12 +495,12 @@ class CostBasedScheduleGenerator( */ def addNeighborStatesToFrontier(): Unit = { val allCurrentMaterializedEdges = - currentState ++ activePlanningPlan.getBlockingAndDependeeLinks + currentState ++ physicalPlan.getBlockingAndDependeeLinks // Generate and enqueue all neighbour states that haven't been visited var candidateEdges = originalNonBlockingEdges .diff(currentState) if (oChains) { - val edgesInChainWithMaterializedEdges = activePlanningPlan.maxChains + val edgesInChainWithMaterializedEdges = physicalPlan.maxChains .filter(chain => chain.intersect(allCurrentMaterializedEdges).nonEmpty) .flatten candidateEdges = candidateEdges.diff( @@ -710,7 +531,7 @@ class CostBasedScheduleGenerator( if (filteredNeighborStates.nonEmpty) { val minCostNeighborState = filteredNeighborStates.minBy(neighborState => tryConnectRegionDAG( - activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState + physicalPlan.getBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => allocateResourcesAndEvaluateCost(regionDAG) @@ -736,7 +557,7 @@ class CostBasedScheduleGenerator( val startTime = System.nanoTime() val (regionDAG, cost) = - tryConnectRegionDAG(activePlanningPlan.links) match { + tryConnectRegionDAG(physicalPlan.links) match { case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag)) case Right(_) => ( @@ -765,19 +586,19 @@ class CostBasedScheduleGenerator( */ def topDownSearch( globalSearch: Boolean = false, - oChains: Boolean = false, - oCleanEdges: Boolean = false + oChains: Boolean = true, + oCleanEdges: Boolean = true ): SearchResult = { val startTime = System.nanoTime() // Starting from a state where all non-blocking edges are materialized - val originalSeedState = activePlanningPlan.links.diff( - activePlanningPlan.getBlockingAndDependeeLinks + val originalSeedState = physicalPlan.links.diff( + physicalPlan.getBlockingAndDependeeLinks ) // Chain optimization: an edge in the same chain as a blocking edge should not be materialized val seedStateOptimizedByChainsIfApplicable = if (oChains) { - val edgesInChainWithBlockingEdge = activePlanningPlan.maxChains - .filter(chain => chain.intersect(activePlanningPlan.getBlockingAndDependeeLinks).nonEmpty) + val edgesInChainWithBlockingEdge = physicalPlan.maxChains + .filter(chain => chain.intersect(physicalPlan.getBlockingAndDependeeLinks).nonEmpty) .flatten originalSeedState.diff(edgesInChainWithBlockingEdge) } else { @@ -786,7 +607,7 @@ class CostBasedScheduleGenerator( // Clean edge optimization: a clean edge should not be materialized val finalSeedState = if (oCleanEdges) { - seedStateOptimizedByChainsIfApplicable.intersect(activePlanningPlan.getNonBridgeNonBlockingLinks) + seedStateOptimizedByChainsIfApplicable.intersect(physicalPlan.getNonBridgeNonBlockingLinks) } else { seedStateOptimizedByChainsIfApplicable } @@ -806,7 +627,7 @@ class CostBasedScheduleGenerator( val currentState = queue.dequeue() visited.add(currentState) tryConnectRegionDAG( - activePlanningPlan.getBlockingAndDependeeLinks ++ currentState + physicalPlan.getBlockingAndDependeeLinks ++ currentState ) match { case Left(regionDAG) => updateOptimumIfApplicable(regionDAG) @@ -849,7 +670,7 @@ class CostBasedScheduleGenerator( if (unvisitedNeighborStates.nonEmpty) { val minCostNeighborState = unvisitedNeighborStates.minBy(neighborState => tryConnectRegionDAG( - activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState + physicalPlan.getBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => allocateResourcesAndEvaluateCost(regionDAG) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala index a37c389606..d86101a1f0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala @@ -19,17 +19,20 @@ package org.apache.texera.amber.engine.architecture.scheduling +import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.WorkflowContext import org.apache.texera.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST -import org.apache.texera.amber.engine.architecture.scheduling.config.{ - InputPortConfig, - IntermediateInputPortConfig, - OutputPortConfig, - ResourceConfig -} +import org.apache.texera.amber.engine.architecture.scheduling.config.ResourceConfig import org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator import org.apache.texera.amber.engine.common.AmberLogging +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.SqlServer.withTransaction +import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, WORKFLOW_VERSION} + +import java.net.URI +import scala.util.{Failure, Success, Try} /** * A cost estimator should estimate a cost of running a region under the given resource constraints as units. @@ -62,27 +65,101 @@ class DefaultCostEstimator( ) extends CostEstimator with AmberLogging { + // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. + private val operatorEstimatedTimeOption = Try( + this.getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) + ) match { + case Failure(_) => None + case Success(result) => result + } + + operatorEstimatedTimeOption match { + case None => + logger.info( + s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, " + + s"no past execution statistics available. Using number of materialized output ports as the cost. " + ) + case Some(_) => + } + override def allocateResourcesAndEstimateCost( region: Region, resourceUnits: Int ): (ResourceConfig, Double) = { // Currently the dummy cost from resourceAllocator is discarded. val (resourceConfig, _) = resourceAllocator.allocate(region) - val cost = - if (region.cached) { - 0.0 - } else { - val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST - val writeCost = resourceConfig.portConfigs.values.collect { - case _: OutputPortConfig => - 0.5 - }.sum - val readCost = resourceConfig.portConfigs.values.collect { - case _: InputPortConfig => 0.5 - case _: IntermediateInputPortConfig => 0.5 - }.sum - opCost + writeCost + readCost - } + // We use a cost model that does not rely on the resource allocation. + // TODO: Once the ResourceAllocator actually calculates a cost, we can use its calculated cost. + val cost = this.operatorEstimatedTimeOption match { + case Some(operatorEstimatedTime) => + // Use past statistics (wall-clock runtime). We use the execution time of the longest-running + // operator in each region to represent the region's execution time, and use the sum of all the regions' + // execution time as the wall-clock runtime of the workflow. + // This assumes a schedule is a total-order of the regions. + val opExecutionTimes = region.getOperators.map(op => { + operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, DEFAULT_OPERATOR_COST) + }) + val longestRunningOpExecutionTime = opExecutionTimes.max + longestRunningOpExecutionTime + case None => + // Without past statistics (e.g., first execution), we use number of ports needing storage as the cost. + // Each port needing storage has a portConfig. + // This is independent of the schedule / resource allocator. + resourceConfig.portConfigs.size + } (resourceConfig, cost) } + + /** + * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. + * Using the total control processing time plus data processing time of an operator as its cost. + * If no past statistics are available (e.g., first execution), return None. + */ + private def getOperatorExecutionTimeInSeconds( + wid: Long + ): Option[Map[String, Double]] = { + + val uriString: String = withTransaction( + SqlServer + .getInstance() + .createDSLContext() + ) { context => + context + .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where( + WORKFLOW_VERSION.WID + .eq(wid.toInt) + .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) + ) + .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) + .limit(1) + .fetchOneInto(classOf[String]) + } + + if (uriString == null || uriString.isEmpty) { + None + } else { + val uri: URI = new URI(uriString) + val document = DocumentFactory.openDocument(uri) + + val maxStats = document._1 + .get() + .foldLeft(Map.empty[String, Double]) { (acc, tuple) => + val record = tuple.asInstanceOf[Tuple] + val operatorId = record.getField(0).asInstanceOf[String] + val dataProcessingTime = record.getField(6).asInstanceOf[Long] + val controlProcessingTime = record.getField(7).asInstanceOf[Long] + val currentMaxTime = acc.getOrElse(operatorId, 0.0) + val newTime = (dataProcessingTime + controlProcessingTime) / 1e9 + acc + (operatorId -> Math.max(currentMaxTime, newTime)) + } + + if (maxStats.isEmpty) None else Some(maxStats) + } + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala new file mode 100644 index 0000000000..3f73bae62f --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/PreSchedulingHints.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling + +import org.apache.texera.amber.core.workflow.GlobalPortIdentity +import org.apache.texera.amber.engine.architecture.scheduling.config.OutputPortConfig + +import java.net.URI + +/** + * Optional external planning hints for CostBasedScheduleGenerator. + * + * These hints are generic URI/config overrides and do not encode any cache-specific decisions. + * They are intended to be prepared by a pre-scheduling step and consumed during region creation + * before resource allocation/search evaluation. + * + * @param outputPortConfigOverrides output-port configs to override default writer URI allocation. + * This can be used for reuse-only outputs (`materialize = false`). + * @param inputPortUriOverrides additional reader URIs for input ports, merged with scheduler-derived + * materialized-edge input URIs. + */ +case class PreSchedulingHints( + outputPortConfigOverrides: Map[GlobalPortIdentity, OutputPortConfig] = Map.empty, + inputPortUriOverrides: Map[GlobalPortIdentity, List[URI]] = Map.empty +) diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index ddf99413d6..7e4c27defd 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -9,7 +9,7 @@ Enable **incremental workflow execution** through deterministic cache reuse of o 3. **Maintain correctness** via deterministic fingerprinting of upstream sub-DAGs 4. **Preserve physical plan immutability**: skip/execute/cache/fresh are scheduler annotations, not plan mutations -**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a two-step process in `CostBasedScheduleGenerator`: (1) deterministic requiredness propagation with forced cache use, then (2) original Pasta optimization on the residual fresh-required structure only. +**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a two-step flow: (1) a controller pre-scheduling step computes deterministic requiredness and cache bindings, then (2) `CostBasedScheduleGenerator` (Pasta) runs on the residual fresh-required structure using those precomputed hints. **Future research goals**: @@ -101,9 +101,12 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research - Convert to serialized form: `Map[String, CachedOutput]` (keyed by serialized `GlobalPortIdentity` to avoid Jackson map key deserialization issues) - Store in `WorkflowSettings.cachedOutputs` and pass to scheduler -### 2. Scheduler Integration (Pasta / CostBasedScheduleGenerator) +### 2. Scheduler Integration (Pre-Step + Pasta) -**Location**: `CostBasedScheduleGenerator` +**Location**: + +- Pre-step: `CacheReusePreSchedulingStep` (controller layer before scheduling) +- Scheduler: `CostBasedScheduleGenerator` (original Pasta search on residual plan) **Inputs**: @@ -113,7 +116,7 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research **MVP Scheduling Flow (Assumption III)**: -1. **Deterministic requiredness propagation** (port/edge aware): +1. **Controller pre-scheduling requiredness propagation** (port/edge aware): - Seed required outports with required sink + visible ports. - For each operator, if any required outport is cache miss, mark operator `Execute`; otherwise `Skip`. - For each `Execute` operator, mark all upstream outports on incoming edges as required. @@ -121,11 +124,17 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research 2. **Classify inputs of executing operators**: - `Cache-fed` if upstream required outport has cache hit. - `Fresh-required` if upstream required outport has cache miss. -3. **Residual Pasta optimization**: +3. **Residual planning construction**: - Build residual planning structure containing only `Execute` operators and `Fresh-required` dependencies. + - Build precomputed planning hints: + - output-port config overrides for reuse-only cache-hit required outputs (`materialize=false`), + - input-port URI overrides for cache-fed execute inputs. + - Build skipped (`ToSkip`) leading regions outside Pasta scope. +4. **Residual Pasta optimization**: - Run original Pasta search/regioning/materialization on this residual structure. - Keep blocking-edge constraints within the residual scope. -4. **Assemble final full schedule**: +5. **Assemble final full schedule**: + - Prepend skipped regions before execute regions. - Include residual execute regions (`ToExecute`) and skipped regions (`ToSkip`) in one schedule. - Regions remain homogeneous (`cached=true/false`). - All URI bindings are fixed at planning time. @@ -397,6 +406,7 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - cache-hit required outputs are bound as reuse-only (`materialize=false`), - cache-fed inputs are pre-bound to cached URIs. - No post-search region re-annotation step; resource allocation remains in Pasta search (`allocateResourcesAndEvaluateCost`) and final execute-region input configs stay as `InputPortConfig`. + - Cache-aware orchestration is isolated in `CacheReusePreSchedulingStep` (controller layer before scheduling); `CostBasedScheduleGenerator` stays close to `main` and only consumes generic precomputed planning hints plus leading skipped regions. - **Runtime execution**: `RegionExecutionCoordinator` branches on `region.cached` flag: - ToSkip regions: `completeCachedRegion()` records cached operator metrics (numWorkers=0, processingTime=0) without creating workers, propagates cached URIs downstream - ToExecute regions: normal execution path
