This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 707dd632615b9a4a314970a088e68c43faa88c8b Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Feb 9 15:05:33 2026 -0800 feat(cache): introduce forced cache reuse assuming reusing cache is always better. --- .../controller/execution/WorkflowExecution.scala | 18 +- .../promisehandlers/PortCompletedHandler.scala | 36 +- .../QueryWorkerStatisticsHandler.scala | 30 +- .../scheduling/CostBasedScheduleGenerator.scala | 561 ++++++++++++++------- .../scheduling/RegionExecutionCoordinator.scala | 33 +- .../scheduling/config/PortConfig.scala | 14 +- .../texera/web/dao/OperatorPortCacheDao.scala | 3 +- docs/operator-port-cache.md | 107 ++-- 8 files changed, 550 insertions(+), 252 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4..b806479b89 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -119,10 +119,22 @@ case class WorkflowExecution() { * @throws NoSuchElementException if no `OperatorExecution` is found for the specified operatorId. */ def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity): OperatorExecution = { - regionExecutions.values.toList + getLatestOperatorExecutionOption(physicalOpId).get + } + + /** + * Returns the latest `OperatorExecution` for a physical operator if it has been initialized. + * + * This is the safe counterpart of `getLatestOperatorExecution` for callers that may traverse + * operators before their region is launched (e.g., full-graph stats queries while execution is still + * progressing through schedule levels). + */ + def getLatestOperatorExecutionOption( + physicalOpId: PhysicalOpIdentity + ): Option[OperatorExecution] = { + regionExecutions.values.toSeq .findLast(regionExecution => regionExecution.hasOperatorExecution(physicalOpId)) - .get - .getOperatorExecution(physicalOpId) + .map(_.getOperatorExecution(physicalOpId)) } def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index e9439eeba1..abb8b1357a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -33,6 +33,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ QueryStatisticsRequest } import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.scheduling.config.OutputPortConfig import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.util.VirtualIdentityUtils import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource @@ -78,20 +79,27 @@ trait PortCompletedHandler { if (isPortCompleted) { // If this is an output port and materialized, notify client (for cache upsert). if (!msg.input) { - val storageUriOpt = - WorkflowExecutionsResource.getResultUriByPhysicalPortId( - cp.workflowContext.executionId, - globalPortId - ) - storageUriOpt.foreach { uri => - // Prefer runtime statistics over storage reads for tuple counts. - val tupleCount = operatorExecution - .getStats - .operatorStatistics - .outputMetrics - .find(_.portId == msg.portId) - .map(_.tupleMetrics.count) - sendToClient(PortMaterialized(globalPortId, uri, tupleCount)) + val isMaterializedOutput = region.resourceConfig + .flatMap(_.portConfigs.get(globalPortId)) + .collect { case cfg: OutputPortConfig => cfg.materialize } + .getOrElse(false) + + if (isMaterializedOutput) { + val storageUriOpt = + WorkflowExecutionsResource.getResultUriByPhysicalPortId( + cp.workflowContext.executionId, + globalPortId + ) + storageUriOpt.foreach { uri => + // Prefer runtime statistics over storage reads for tuple counts. + val tupleCount = operatorExecution + .getStats + .operatorStatistics + .outputMetrics + .find(_.portId == msg.portId) + .map(_.tupleMetrics.count) + sendToClient(PortMaterialized(globalPortId, uri, tupleCount)) + } } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala index fb725f850e..9f21d10383 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala @@ -105,20 +105,24 @@ trait QueryWorkerStatisticsHandler { if (opFilter.nonEmpty && !opFilter.contains(opId)) { Seq.empty } else { - val exec = cp.workflowExecution.getLatestOperatorExecution(opId) - // Skip completed operators - if (exec.getState == COMPLETED) { - Seq.empty - } else { - // Select all workers for this operator - val workerIds = exec.getWorkerIds - - // Send queryStatistics to each worker and update internal state on reply - workerIds.map { wid => - workerInterface.queryStatistics(EmptyRequest(), wid).map { resp => - collectedResults.addOne((exec.getWorkerExecution(wid), resp, System.nanoTime())) + cp.workflowExecution.getLatestOperatorExecutionOption(opId) match { + // Operator region has not been initialized yet; skip in this polling round. + case None => Seq.empty + case Some(exec) => + // Skip completed operators + if (exec.getState == COMPLETED) { + Seq.empty + } else { + // Select all workers for this operator + val workerIds = exec.getWorkerIds + + // Send queryStatistics to each worker and update internal state on reply + workerIds.map { wid => + workerInterface.queryStatistics(EmptyRequest(), wid).map { resp => + collectedResults.addOne((exec.getWorkerExecution(wid), resp, System.nanoTime())) + } + } } - } } } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index c603c8ef34..fc4a54c0f9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -30,7 +30,6 @@ import org.apache.texera.amber.engine.architecture.scheduling.config.{ ResourceConfig } import org.apache.texera.amber.engine.common.AmberLogging -import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde import org.jgrapht.Graph import org.jgrapht.alg.connectivity.BiconnectivityInspector @@ -64,6 +63,19 @@ class CostBasedScheduleGenerator( numStatesExplored: Int = 0 ) + /** + * Deterministic cache-aware propagation result under Assumption III (forced cache use). + */ + private case class RequirednessAnalysis( + requiredSeedPorts: Set[GlobalPortIdentity], + requiredOutputPorts: Set[GlobalPortIdentity], + executeOperators: Set[PhysicalOpIdentity], + freshRequiredOutputPorts: Set[GlobalPortIdentity], + cacheHitRequiredOutputPorts: Set[GlobalPortIdentity], + freshRequiredLinks: Set[PhysicalLink], + cacheFedLinks: Set[PhysicalLink] + ) + private val costEstimator = new DefaultCostEstimator( workflowContext = workflowContext, @@ -77,6 +89,30 @@ class CostBasedScheduleGenerator( GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput } + /** + * Search-time planning bindings applied during region construction. + * + * @param visibleFreshOutputPorts visible/sink output ports that must be freshly materialized. + * @param reuseOnlyOutputsByPort output ports that are required but must be served from cache (no rematerialization). + * @param cacheFedInputUrisByPort input ports pre-bound to cached upstream URIs. + */ + private case class PlanningBindings( + visibleFreshOutputPorts: Set[GlobalPortIdentity], + reuseOnlyOutputsByPort: Map[GlobalPortIdentity, CachedOutput], + cacheFedInputUrisByPort: Map[GlobalPortIdentity, List[URI]] + ) + + private object PlanningBindings { + val empty: PlanningBindings = PlanningBindings( + visibleFreshOutputPorts = Set.empty, + reuseOnlyOutputsByPort = Map.empty, + cacheFedInputUrisByPort = Map.empty + ) + } + + private var activePlanningPlan: PhysicalPlan = initialPhysicalPlan + private var activePlanningBindings: PlanningBindings = PlanningBindings.empty + private case class CostEstimatorMemoKey( physicalOpIds: Set[PhysicalOpIdentity], physicalLinkIds: Set[PhysicalLink], @@ -84,173 +120,275 @@ class CostBasedScheduleGenerator( resourceConfig: Option[ResourceConfig] ) - /** - * Classify regions as cached vs must-execute and rebuild port configs accordingly. - * A region is must-execute if it has a visible port without cache, or if it feeds a region - * that needs a materialization without cache (propagated upstream). - */ - private def annotateRegionsWithCacheInfo( - regionDAG: DirectedAcyclicGraph[Region, RegionLink], - matEdges: Set[PhysicalLink], - opToRegionMap: Map[PhysicalOpIdentity, Region] - ): Unit = { - val regions = regionDAG.vertexSet().asScala.toSet - - val needsByRegion = computePortsNeedingStorage(regions, matEdges, opToRegionMap) - val mustExecute = computeMustExecuteRegions(needsByRegion, matEdges, opToRegionMap) - val cachedRegions = regions.diff(mustExecute) - val outputConfigByPort = buildOutputPortConfigs(needsByRegion, cachedRegions) - val inputConfigByPort = buildInputPortConfigs(matEdges, outputConfigByPort) - - // Rebuild each region with refreshed port configs and cached flag. - regions.foreach { region => - val outputCfgs = needsByRegion(region).map(pid => pid -> outputConfigByPort(pid)).toMap - val inputCfgs = inputConfigByPort.filter { case (pid, _) => region.ports.contains(pid) } - val portConfigs = outputCfgs ++ inputCfgs - val newResourceConfig = - if (portConfigs.nonEmpty) Some(ResourceConfig(portConfigs = portConfigs)) - else region.resourceConfig + private val costEstimatorMemoization + : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] = + new mutable.HashMap() - val newRegion = region.copy( - resourceConfig = newResourceConfig, - cached = cachedRegions.contains(region) + def generate(): (Schedule, PhysicalPlan) = { + val startTime = System.nanoTime() + val analysis = analyzeRequiredness(initialPhysicalPlan) + + val executeRegionDAG = new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]) + if (analysis.executeOperators.nonEmpty) { + val residualPlan = buildResidualPlan( + initialPhysicalPlan, + analysis.executeOperators, + analysis.freshRequiredLinks ) - replaceVertex(regionDAG, region, newRegion) + val residualBindings = buildResidualPlanningBindings(residualPlan, analysis) + val searchResult = searchOnPlan(residualPlan, residualBindings) + searchResult.regionDAG.vertexSet().forEach(region => executeRegionDAG.addVertex(region)) + searchResult.regionDAG + .edgeSet() + .forEach(edge => + executeRegionDAG.addEdge( + searchResult.regionDAG.getEdgeSource(edge), + searchResult.regionDAG.getEdgeTarget(edge), + edge + ) + ) } - } - private def computePortsNeedingStorage( - regions: Set[Region], - matEdges: Set[PhysicalLink], - opToRegionMap: Map[PhysicalOpIdentity, Region] - ): Map[Region, Set[GlobalPortIdentity]] = { - // Ports that require materialization for this run: scheduler-imposed (matEdges) + UI-visible ports. - regions.map { region => - val fromMatEdges = matEdges - .filter(e => opToRegionMap(e.fromOpId) == region) - .map(e => GlobalPortIdentity(e.fromOpId, e.fromPortId)) - val visiblePorts = workflowContext.workflowSettings.outputPortsNeedingStorage - .filter(pid => region.physicalOps.exists(_.id == pid.opId)) - region -> (fromMatEdges ++ visiblePorts) - }.toMap + val skipRegions = buildSkipRegions(initialPhysicalPlan, analysis) + val finalRegionPlan = buildFinalRegionPlan(executeRegionDAG, skipRegions) + val totalRPGTime = System.nanoTime() - startTime + val schedule = generateScheduleFromRegionPlan(finalRegionPlan) + logger.info( + s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, total RPG time: " + + s"${totalRPGTime / 1e6} ms." + ) + ( + schedule, + initialPhysicalPlan + ) } - private def computeMustExecuteRegions( - needsByRegion: Map[Region, Set[GlobalPortIdentity]], - matEdges: Set[PhysicalLink], - opToRegionMap: Map[PhysicalOpIdentity, Region] - ): Set[Region] = { - // Seed with regions that have any needed port lacking cache; then propagate upstream along materialized edges. - val mustExecute = mutable.Set[Region]() - needsByRegion.foreach { - case (region, needs) => - if (needs.exists(pid => !cachedOutputsByPort.contains(pid))) { - mustExecute += region - } - } + /** + * Computes deterministic requiredness and execute/skip decisions under Assumption III (forced cache use). + */ + private def analyzeRequiredness(plan: PhysicalPlan): RequirednessAnalysis = { + val requiredSeeds = + workflowContext.workflowSettings.outputPortsNeedingStorage + .filter(pid => plan.operators.exists(_.id == pid.opId)) + + val requiredOutputPorts = mutable.Set[GlobalPortIdentity]() + requiredOutputPorts ++= requiredSeeds + val executeOperators = mutable.Set[PhysicalOpIdentity]() + var changed = true while (changed) { changed = false - matEdges.foreach { e => - val fromRegion = opToRegionMap(e.fromOpId) - val toRegion = opToRegionMap(e.toOpId) - val outPort = GlobalPortIdentity(e.fromOpId, e.fromPortId) - if ( - mustExecute.contains(toRegion) && - !cachedOutputsByPort.contains(outPort) && - !mustExecute.contains(fromRegion) - ) { - mustExecute += fromRegion - changed = true + plan.topologicalIterator().foreach { opId => + val op = plan.getOperator(opId) + val requiredOutputsOfOp = + op.outputPorts.keys + .map(portId => GlobalPortIdentity(op.id, portId)) + .filter(requiredOutputPorts.contains) + .toSet + if (requiredOutputsOfOp.nonEmpty) { + val hasCacheMiss = requiredOutputsOfOp.exists(pid => !cachedOutputsByPort.contains(pid)) + if (hasCacheMiss) { + if (!executeOperators.contains(opId)) { + executeOperators += opId + changed = true + } + plan.getUpstreamPhysicalLinks(opId).foreach { link => + val upstreamOutput = GlobalPortIdentity(link.fromOpId, link.fromPortId) + if (!requiredOutputPorts.contains(upstreamOutput)) { + requiredOutputPorts += upstreamOutput + changed = true + } + } + } } } } - mustExecute.toSet + + val freshRequiredOutputPorts = + requiredOutputPorts.filter(pid => !cachedOutputsByPort.contains(pid)).toSet + val cacheHitRequiredOutputPorts = + requiredOutputPorts.filter(cachedOutputsByPort.contains).toSet + val linksIntoExecute = plan.links.filter(link => executeOperators.contains(link.toOpId)) + val freshRequiredLinks = linksIntoExecute + .filter(link => + freshRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) + ) + .toSet + val cacheFedLinks = linksIntoExecute + .filter(link => + cacheHitRequiredOutputPorts.contains(GlobalPortIdentity(link.fromOpId, link.fromPortId)) + ) + .toSet + + RequirednessAnalysis( + requiredSeedPorts = requiredSeeds, + requiredOutputPorts = requiredOutputPorts.toSet, + executeOperators = executeOperators.toSet, + freshRequiredOutputPorts = freshRequiredOutputPorts, + cacheHitRequiredOutputPorts = cacheHitRequiredOutputPorts, + freshRequiredLinks = freshRequiredLinks, + cacheFedLinks = cacheFedLinks + ) } - private def buildOutputPortConfigs( - needsByRegion: Map[Region, Set[GlobalPortIdentity]], - cachedRegions: Set[Region] - ): Map[GlobalPortIdentity, OutputPortConfig] = { - // For cached regions, reuse cached URI + tuple count; otherwise allocate new URI with no cached count. - needsByRegion.flatMap { - case (region, ports) => - ports.map { gpid => - val (uri, cachedCount) = - if (cachedRegions.contains(region)) { - val cached = cachedOutputsByPort(gpid) - (cached.resultUri, cached.tupleCount) - } else { - ( - createResultURI( - workflowId = workflowContext.workflowId, - executionId = workflowContext.executionId, - globalPortId = gpid - ), - None - ) - } - gpid -> OutputPortConfig(uri, cachedCount) - } - } + /** + * Builds a residual plan containing only execute operators and fresh-required dependencies. + */ + private def buildResidualPlan( + plan: PhysicalPlan, + executeOperators: Set[PhysicalOpIdentity], + freshRequiredLinks: Set[PhysicalLink] + ): PhysicalPlan = { + val linksToRemove = plan.links.diff(freshRequiredLinks) + val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) + linksPrunedPlan.getSubPlan(executeOperators) } - private def buildInputPortConfigs( - matEdges: Set[PhysicalLink], - outputConfigByPort: Map[GlobalPortIdentity, OutputPortConfig] - ): Map[GlobalPortIdentity, IntermediateInputPortConfig] = { - matEdges - .groupBy(e => GlobalPortIdentity(e.toOpId, e.toPortId, input = true)) - .map { - case (inputPid, links) => - val uris = links - .map(link => - outputConfigByPort(GlobalPortIdentity(link.fromOpId, link.fromPortId)).storageURI - ) - .toList - inputPid -> IntermediateInputPortConfig(uris) + /** + * Builds the search-time binding context for residual Pasta planning. + * All bindings are prepared before search and consumed directly by createRegions. + */ + private def buildResidualPlanningBindings( + residualPlan: PhysicalPlan, + analysis: RequirednessAnalysis + ): PlanningBindings = { + val residualOps = residualPlan.operators.map(_.id).toSet + + val visibleFreshOutputPorts = analysis.requiredSeedPorts + .intersect(analysis.freshRequiredOutputPorts) + .filter(pid => residualOps.contains(pid.opId)) + + val reuseOnlyOutputsByPort = analysis.cacheHitRequiredOutputPorts + .filter(pid => residualOps.contains(pid.opId)) + .flatMap(pid => cachedOutputsByPort.get(pid).map(cached => pid -> cached)) + .toMap + + val cacheFedInputUrisByPort = analysis.cacheFedLinks + .filter(link => residualOps.contains(link.toOpId)) + .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { + case (acc, link) => + val inputPort = GlobalPortIdentity(link.toOpId, link.toPortId, input = true) + val outputPort = GlobalPortIdentity(link.fromOpId, link.fromPortId) + cachedOutputsByPort.get(outputPort) match { + case Some(cached) => + val uris = acc.getOrElse(inputPort, List.empty) :+ cached.resultUri + acc.updated(inputPort, uris) + case None => + acc + } } + + PlanningBindings( + visibleFreshOutputPorts = visibleFreshOutputPorts, + reuseOnlyOutputsByPort = reuseOnlyOutputsByPort, + cacheFedInputUrisByPort = cacheFedInputUrisByPort + ) } - private val costEstimatorMemoization - : mutable.Map[CostEstimatorMemoKey, (ResourceConfig, Double)] = - new mutable.HashMap() + /** + * Runs the original Pasta search on a provided plan with precomputed planning bindings. + */ + private def searchOnPlan( + plan: PhysicalPlan, + planningBindings: PlanningBindings + ): SearchResult = { + val oldPlan = activePlanningPlan + val oldBindings = activePlanningBindings + try { + activePlanningPlan = plan + activePlanningBindings = planningBindings + costEstimatorMemoization.clear() + runSearchWithTimeout() + } finally { + activePlanningPlan = oldPlan + activePlanningBindings = oldBindings + } + } - def generate(): (Schedule, PhysicalPlan) = { - val startTime = System.nanoTime() - val regionDAG = createRegionDAG() - val totalRPGTime = System.nanoTime() - startTime - val regionPlan = RegionPlan( - regions = regionDAG.iterator().asScala.toSet, - regionLinks = regionDAG.edgeSet().asScala.toSet - ) - val schedule = generateScheduleFromRegionPlan(regionPlan) - logger.info( - s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, total RPG time: " + - s"${totalRPGTime / 1e6} ms." - ) - ( - schedule, - physicalPlan + /** + * Builds skip regions over operators excluded from residual Pasta planning. + */ + private def buildSkipRegions( + plan: PhysicalPlan, + analysis: RequirednessAnalysis + ): Set[Region] = { + val skipOperators = plan.operators.map(_.id).diff(analysis.executeOperators) + if (skipOperators.isEmpty) { + return Set.empty + } + val skipInternalLinks = plan.links + .filter(link => skipOperators.contains(link.fromOpId) && skipOperators.contains(link.toOpId)) + val linksToRemove = plan.links.diff(skipInternalLinks) + val linksPrunedPlan = linksToRemove.foldLeft(plan)((acc, link) => acc.removeLink(link)) + val skipPlan = linksPrunedPlan.getSubPlan(skipOperators) + val skipRegionSkeletons = createRegions(skipPlan, Set.empty) + skipRegionSkeletons.map { region => + val reuseOnlyOutputConfigs = analysis.cacheHitRequiredOutputPorts + .filter(pid => region.physicalOps.exists(_.id == pid.opId)) + .flatMap { outputPort => + cachedOutputsByPort.get(outputPort).map { cached => + outputPort -> OutputPortConfig( + storageURI = cached.resultUri, + cachedTupleCount = cached.tupleCount, + materialize = false + ) + } + } + .toMap + region.copy( + cached = true, + resourceConfig = Some(ResourceConfig(portConfigs = reuseOnlyOutputConfigs)) + ) + } + } + + /** + * Produces the final full region plan by combining skip regions and execute regions + * (from residual Pasta), then reindexing region IDs with skipped regions first. + */ + private def buildFinalRegionPlan( + executeRegionDAG: DirectedAcyclicGraph[Region, RegionLink], + skipRegions: Set[Region] + ): RegionPlan = { + val executeRegions = executeRegionDAG.vertexSet().asScala.toSet + val executeLinks = executeRegionDAG.edgeSet().asScala.toSet + val allRegions = executeRegions ++ skipRegions + val orderedRegions = allRegions.toSeq.sortBy(region => + (if (skipRegions.contains(region)) 0 else 1, region.id.id) ) + val remappedRegionByRegion = orderedRegions.zipWithIndex.map { + case (region, idx) => region -> region.copy(id = RegionIdentity(idx.toLong)) + }.toMap + val executeRegionById = executeRegions.map(region => region.id -> region).toMap + + val remappedRegions = remappedRegionByRegion.values.toSet + val remappedExecuteLinks = executeLinks.map { link => + val fromRegion = executeRegionById(link.fromRegionId) + val toRegion = executeRegionById(link.toRegionId) + RegionLink( + remappedRegionByRegion(fromRegion).id, + remappedRegionByRegion(toRegion).id + ) + } + RegionPlan(remappedRegions, remappedExecuteLinks) } /** - * Partitions a physical plan into Regions (skeletons). Cache vs must-execute - * classification and URI assignment are applied later in annotateRegionsWithCacheInfo - * using the RegionDAG and materialization edges. + * Partitions a physical plan into Regions and assigns search-time port bindings in two passes. * - * @param physicalPlan the original physical plan (without materializations) - * @param matEdges edges to be materialized (including blocking edges) - * @return a set of `Region` skeletons (resourceConfig/cached filled later) + * Pass 1 assigns output bindings for: + * - outputs of materialized edges in the current search state, + * - visible required outputs that must be freshly produced, + * - reuse-only required outputs that must bind to cached URIs. + * + * Pass 2 builds input bindings by wiring: + * - materialized-edge reader URIs from pass 1, + * - cache-fed input URIs precomputed during requiredness analysis. */ private def createRegions( physicalPlan: PhysicalPlan, matEdges: Set[PhysicalLink] ): Set[Region] = { - - // remove materialized edges and create connected components - val matEdgesRemovedDAG: PhysicalPlan = matEdges.foldLeft(physicalPlan)(_.removeLink(_)) val connectedComponents: Set[Graph[PhysicalOpIdentity, PhysicalLink]] = @@ -258,12 +396,8 @@ class CostBasedScheduleGenerator( matEdgesRemovedDAG.dag ).getConnectedComponents.asScala.toSet - // build region skeletons with no materialization information - - val regionSkeletons: Set[Region] = connectedComponents.zipWithIndex.map { + val regionsWithOutputBindings: Set[Region] = connectedComponents.zipWithIndex.map { case (connectedSubDAG, idx) => - // Operators and intra‑region pipelined links - val operators: Set[PhysicalOpIdentity] = connectedSubDAG.vertexSet().asScala.toSet val links: Set[PhysicalLink] = operators @@ -276,7 +410,6 @@ class CostBasedScheduleGenerator( val physicalOps: Set[PhysicalOp] = operators.map(physicalPlan.getOperator) - // Enumerate all ports belonging to the Region val ports: Set[GlobalPortIdentity] = physicalOps.flatMap { op => op.inputPorts.keys .map(inputPortId => GlobalPortIdentity(op.id, inputPortId, input = true)) @@ -285,19 +418,103 @@ class CostBasedScheduleGenerator( .toSet } - // Build the Region skeleton; cache/resourceConfig to be populated after classification. + val outputPortIdsFromMatEdges = matEdges + .filter(edge => operators.contains(edge.fromOpId)) + .map(edge => GlobalPortIdentity(edge.fromOpId, edge.fromPortId)) + val visibleFreshOutputPortIds = activePlanningBindings.visibleFreshOutputPorts + .filter(portId => operators.contains(portId.opId)) + val reuseOnlyOutputPortIds = activePlanningBindings.reuseOnlyOutputsByPort.keySet + .filter(portId => operators.contains(portId.opId)) + val outputPortIdsNeedingBindings = + outputPortIdsFromMatEdges ++ visibleFreshOutputPortIds ++ reuseOnlyOutputPortIds + + val outputPortConfigs = outputPortIdsNeedingBindings.map { outputPortId => + activePlanningBindings.reuseOnlyOutputsByPort.get(outputPortId) match { + case Some(cachedOutput) => + outputPortId -> OutputPortConfig( + storageURI = cachedOutput.resultUri, + cachedTupleCount = cachedOutput.tupleCount, + materialize = false + ) + case None => + outputPortId -> OutputPortConfig( + storageURI = createResultURI( + workflowId = workflowContext.workflowId, + executionId = workflowContext.executionId, + globalPortId = outputPortId + ), + cachedTupleCount = None, + materialize = true + ) + } + }.toMap + + val resourceConfig = + if (outputPortConfigs.nonEmpty) Some(ResourceConfig(portConfigs = outputPortConfigs)) + else None + Region( id = RegionIdentity(idx), physicalOps = physicalOps, physicalLinks = links, ports = ports, - resourceConfig = None, + resourceConfig = resourceConfig, cached = false ) } - // Regions are returned as skeletons; cache/URI assignment happens in annotateRegionsWithCacheInfo. - regionSkeletons + val regionByOperator = regionsWithOutputBindings + .flatMap(region => region.getOperators.map(op => op.id -> region)) + .toMap + + regionsWithOutputBindings.map { region => + val inputUrisFromMatEdges = matEdges + .filter(edge => region.physicalOps.exists(_.id == edge.toOpId)) + .foldLeft(Map.empty[GlobalPortIdentity, List[URI]]) { + case (acc, matEdge) => + val globalInputPortId = + GlobalPortIdentity(matEdge.toOpId, matEdge.toPortId, input = true) + val globalOutputPortId = GlobalPortIdentity(matEdge.fromOpId, matEdge.fromPortId) + val inputReaderUri = regionByOperator(matEdge.fromOpId) + .resourceConfig + .get + .portConfigs(globalOutputPortId) + .storageURIs + .head + acc.updated( + globalInputPortId, + acc.getOrElse(globalInputPortId, List.empty[URI]) :+ inputReaderUri + ) + } + + val cacheFedInputUris = activePlanningBindings.cacheFedInputUrisByPort + .filter { case (inputPortId, _) => region.ports.contains(inputPortId) } + .toMap + + val mergedInputUris = (inputUrisFromMatEdges.keySet ++ cacheFedInputUris.keySet) + .map { inputPortId => + val uris = + inputUrisFromMatEdges.getOrElse(inputPortId, List.empty) ++ + cacheFedInputUris.getOrElse(inputPortId, List.empty) + inputPortId -> uris + } + .toMap + + val inputPortConfigs = mergedInputUris.map { + case (inputPortId, uris) => + inputPortId -> IntermediateInputPortConfig(uris) + } + + val existingPortConfigs = region.resourceConfig.map(_.portConfigs).getOrElse(Map.empty) + val newResourceConfig = + if (inputPortConfigs.nonEmpty || existingPortConfigs.nonEmpty) { + Some(ResourceConfig(portConfigs = existingPortConfigs ++ inputPortConfigs)) + } else { + None + } + + region.copy(resourceConfig = newResourceConfig) + } } /** @@ -313,7 +530,7 @@ class CostBasedScheduleGenerator( val regionDAG = new DirectedAcyclicGraph[Region, RegionLink](classOf[RegionLink]) val regionGraph = new DirectedPseudograph[Region, RegionLink](classOf[RegionLink]) val opToRegionMap = new mutable.HashMap[PhysicalOpIdentity, Region] - createRegions(physicalPlan, matEdges).foreach(region => { + createRegions(activePlanningPlan, matEdges).foreach(region => { region.getOperators.foreach(op => opToRegionMap(op.id) = region) regionGraph.addVertex(region) regionDAG.addVertex(region) @@ -330,19 +547,13 @@ class CostBasedScheduleGenerator( isAcyclic = false } }) - if (isAcyclic) { - annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap) - Left(regionDAG) - } else Right(regionGraph) + if (isAcyclic) Left(regionDAG) else Right(regionGraph) } /** - * Performs a search to generate a region DAG. - * Materializations are added only after the plan is determined to be schedulable. - * - * @return A region DAG. + * Runs Pasta search with timeout handling and returns the selected search result. */ - private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = { + private def runSearchWithTimeout(): SearchResult = { val searchResultFuture: Future[SearchResult] = Future { workflowContext.workflowSettings.executionMode match { case ExecutionMode.MATERIALIZED => @@ -376,9 +587,7 @@ class CostBasedScheduleGenerator( s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, search time: " + s"${searchResult.searchTimeNanoSeconds / 1e6} ms." ) - - val regionDAG = searchResult.regionDAG - regionDAG + searchResult } /** @@ -400,10 +609,10 @@ class CostBasedScheduleGenerator( val startTime = System.nanoTime() val originalNonBlockingEdges = if (oCleanEdges) { - physicalPlan.getNonBridgeNonBlockingLinks + activePlanningPlan.getNonBridgeNonBlockingLinks } else { - physicalPlan.links.diff( - physicalPlan.getBlockingAndDependeeLinks + activePlanningPlan.links.diff( + activePlanningPlan.getBlockingAndDependeeLinks ) } // Queue to hold states to be explored, starting with the empty set @@ -434,7 +643,7 @@ class CostBasedScheduleGenerator( } visited.add(currentState) tryConnectRegionDAG( - physicalPlan.getBlockingAndDependeeLinks ++ currentState + activePlanningPlan.getBlockingAndDependeeLinks ++ currentState ) match { case Left(regionDAG) => updateOptimumIfApplicable(regionDAG) @@ -465,12 +674,12 @@ class CostBasedScheduleGenerator( */ def addNeighborStatesToFrontier(): Unit = { val allCurrentMaterializedEdges = - currentState ++ physicalPlan.getBlockingAndDependeeLinks + currentState ++ activePlanningPlan.getBlockingAndDependeeLinks // Generate and enqueue all neighbour states that haven't been visited var candidateEdges = originalNonBlockingEdges .diff(currentState) if (oChains) { - val edgesInChainWithMaterializedEdges = physicalPlan.maxChains + val edgesInChainWithMaterializedEdges = activePlanningPlan.maxChains .filter(chain => chain.intersect(allCurrentMaterializedEdges).nonEmpty) .flatten candidateEdges = candidateEdges.diff( @@ -501,7 +710,7 @@ class CostBasedScheduleGenerator( if (filteredNeighborStates.nonEmpty) { val minCostNeighborState = filteredNeighborStates.minBy(neighborState => tryConnectRegionDAG( - physicalPlan.getBlockingAndDependeeLinks ++ neighborState + activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => allocateResourcesAndEvaluateCost(regionDAG) @@ -527,7 +736,7 @@ class CostBasedScheduleGenerator( val startTime = System.nanoTime() val (regionDAG, cost) = - tryConnectRegionDAG(physicalPlan.links) match { + tryConnectRegionDAG(activePlanningPlan.links) match { case Left(dag) => (dag, allocateResourcesAndEvaluateCost(dag)) case Right(_) => ( @@ -561,14 +770,14 @@ class CostBasedScheduleGenerator( ): SearchResult = { val startTime = System.nanoTime() // Starting from a state where all non-blocking edges are materialized - val originalSeedState = physicalPlan.links.diff( - physicalPlan.getBlockingAndDependeeLinks + val originalSeedState = activePlanningPlan.links.diff( + activePlanningPlan.getBlockingAndDependeeLinks ) // Chain optimization: an edge in the same chain as a blocking edge should not be materialized val seedStateOptimizedByChainsIfApplicable = if (oChains) { - val edgesInChainWithBlockingEdge = physicalPlan.maxChains - .filter(chain => chain.intersect(physicalPlan.getBlockingAndDependeeLinks).nonEmpty) + val edgesInChainWithBlockingEdge = activePlanningPlan.maxChains + .filter(chain => chain.intersect(activePlanningPlan.getBlockingAndDependeeLinks).nonEmpty) .flatten originalSeedState.diff(edgesInChainWithBlockingEdge) } else { @@ -577,7 +786,7 @@ class CostBasedScheduleGenerator( // Clean edge optimization: a clean edge should not be materialized val finalSeedState = if (oCleanEdges) { - seedStateOptimizedByChainsIfApplicable.intersect(physicalPlan.getNonBridgeNonBlockingLinks) + seedStateOptimizedByChainsIfApplicable.intersect(activePlanningPlan.getNonBridgeNonBlockingLinks) } else { seedStateOptimizedByChainsIfApplicable } @@ -597,7 +806,7 @@ class CostBasedScheduleGenerator( val currentState = queue.dequeue() visited.add(currentState) tryConnectRegionDAG( - physicalPlan.getBlockingAndDependeeLinks ++ currentState + activePlanningPlan.getBlockingAndDependeeLinks ++ currentState ) match { case Left(regionDAG) => updateOptimumIfApplicable(regionDAG) @@ -640,7 +849,7 @@ class CostBasedScheduleGenerator( if (unvisitedNeighborStates.nonEmpty) { val minCostNeighborState = unvisitedNeighborStates.minBy(neighborState => tryConnectRegionDAG( - physicalPlan.getBlockingAndDependeeLinks ++ neighborState + activePlanningPlan.getBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => allocateResourcesAndEvaluateCost(regionDAG) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index e292cddf0b..fe34536c47 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -50,6 +50,7 @@ import org.apache.texera.amber.engine.architecture.scheduling.config.{ InputPortConfig, OperatorConfig, OutputPortConfig, + PortConfig, ResourceConfig } import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning @@ -140,8 +141,9 @@ class RegionExecutionCoordinator( val outputMetrics = resourceConfig.portConfigs .collect { case (gpid, cfg: OutputPortConfig) if gpid.opId == op.id => - // Only emit metrics for materialized outputs; UI treats missing ports as skipped. - val count = cfg.cachedTupleCount.getOrElse(0L) + // Emit metrics only for configured output ports in this cached region. + // Use -1 to preserve unknown cached counts in UI/stats instead of reporting 0. + val count = cfg.cachedTupleCount.getOrElse(-1L) PortTupleMetricsMapping(gpid.portId, TupleMetrics(count, 0L)) } .toSeq @@ -317,10 +319,12 @@ class RegionExecutionCoordinator( private def executeNonDependeePortPhase(): Future[Unit] = { setPhase(ExecutingNonDependeePortsPhase) - // Allocate output port storage objects + // Register reuse-only output bindings (cache-hit ports) without creating new storage objects. + registerReuseOnlyOutputPortResults(region.resourceConfig.get.portConfigs) + // Allocate output port storage objects for fresh materializations only. region.resourceConfig.get.portConfigs .collect { - case (id, cfg: OutputPortConfig) => id -> cfg + case (id, cfg: OutputPortConfig) if cfg.materialize => id -> cfg } .foreach { case (pid, cfg) => @@ -502,7 +506,7 @@ class RegionExecutionCoordinator( if gid == GlobalPortIdentity( opId = physicalOp.id, portId = outputPortId - ) => + ) && cfg.materialize => cfg.storageURI.toString } .getOrElse("") @@ -538,6 +542,25 @@ class RegionExecutionCoordinator( ) } + /** + * Persists result URI bindings for reuse-only output ports so consumers and UI can + * resolve cached results in the current execution without rematerialization. + */ + private def registerReuseOnlyOutputPortResults( + portConfigs: Map[GlobalPortIdentity, PortConfig] + ): Unit = { + portConfigs.foreach { + case (outputPortId, outputCfg: OutputPortConfig) + if !outputPortId.input && !outputCfg.materialize => + WorkflowExecutionsResource.insertOperatorPortResultUri( + eid = executionId, + globalPortId = outputPortId, + uri = outputCfg.storageURI + ) + case _ => + } + } + private def connectChannels(links: Set[PhysicalLink]): Future[Seq[EmptyReturn]] = { if (region.cached) { return Future.value(Seq.empty) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala index 8d6c616f79..cac339927d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala @@ -31,8 +31,18 @@ sealed trait PortConfig { def storageURIs: List[URI] } -/** An output port requires exactly one materialization URI. */ -final case class OutputPortConfig(storageURI: URI, cachedTupleCount: Option[Long] = None) +/** + * Output port configuration for scheduling/runtime. + * + * @param storageURI URI bound at planning time for this output port. + * @param cachedTupleCount Optional cached tuple count for UI/metrics when serving from cache. + * @param materialize When false, this output is reuse-only (cache-hit) and must not be freshly materialized. + */ +final case class OutputPortConfig( + storageURI: URI, + cachedTupleCount: Option[Long] = None, + materialize: Boolean = true +) extends PortConfig { override val storageURIs: List[URI] = List(storageURI) } diff --git a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala index a2e2591802..7476a99d3c 100644 --- a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala +++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala @@ -156,7 +156,7 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { /** * Insert or update a cache entry (upsert). - * On conflict (workflow_id, global_port_id, subdag_hash), updates the existing record. + * On conflict (workflow_id, global_port_id, subdag_hash), updates the existing record and refreshes updated_at. * * @param record OperatorPortCacheRecord to insert/update */ @@ -183,6 +183,7 @@ class OperatorPortCacheDao(sqlServer: SqlServer) { .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson) .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount) .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, dbRecord.getSourceExecutionId) + .set(OPERATOR_PORT_CACHE.UPDATED_AT, DSL.currentOffsetDateTime()) .execute() } diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index a98e9801c7..ddf99413d6 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -2,18 +2,18 @@ ## Objective -Enable **incremental workflow execution** through cost-aware caching of operator output ports. When users iteratively refine workflows (modify parameters, add/remove operators), the system should: +Enable **incremental workflow execution** through deterministic cache reuse of operator output ports. Under MVP Assumption III, when users iteratively refine workflows (modify parameters, add/remove operators), the system should: 1. **Reuse cached results** where upstream computation is unchanged -2. **Make cost-aware decisions** comparing cache read cost vs recomputation cost +2. **Never recompute a required output port with a cache hit** (forced cache use) 3. **Maintain correctness** via deterministic fingerprinting of upstream sub-DAGs -4. **Preserve physical plan immutability** — caching is scheduler metadata, not plan modification +4. **Preserve physical plan immutability**: skip/execute/cache/fresh are scheduler annotations, not plan mutations -**Key principle**: The physical plan remains unchanged; cache-or-recompute decisions are made by the scheduler (Pasta / CostBasedScheduleGenerator) based on cache metadata keyed by a deterministic SHA-256 fingerprint of the upstream sub-DAG. +**Key principle (MVP)**: The physical plan remains unchanged. Scheduling is a two-step process in `CostBasedScheduleGenerator`: (1) deterministic requiredness propagation with forced cache use, then (2) original Pasta optimization on the residual fresh-required structure only. -**Research goals**: +**Future research goals**: -- Extend Pasta scheduler with cost-based caching decisions +- Extend Pasta scheduler with cost-based cache-vs-recompute decisions - Develop cost models and pruning heuristics for what/when to cache - Evaluate speedup on iterative data science workflows - (Optional) Lifecycle management: eviction, invalidation, garbage collection @@ -22,7 +22,7 @@ Enable **incremental workflow execution** through cost-aware caching of operator ### 1. Physical Plan Immutability -The physical plan is never modified to accommodate caching. Cache decisions are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs`. This preserves the integrity of the Pasta scheduling framework. +The physical plan is never modified to accommodate caching. Cache decisions are metadata passed to the scheduler via `WorkflowSettings.cachedOutputs` and required output ports. ### 2. Fingerprint-Based Correctness @@ -38,21 +38,31 @@ A region is either fully cached (ToSkip) or fully executed (ToExecute) — no pa - Scheduler logic (binary cached flag per region) - Runtime state management (consistent execution mode) -- Cost model (compare full region costs) +- MVP planning (skip regions are excluded from Pasta planning scope) -### 4. Shallow State Hierarchy for Cached Regions +### 4. Forced Cache Use (Assumption III) + +For any required output port: + +- If cache exists, the requirement is satisfied from cache. +- The port is not recomputed for satisfying that requirement. +- There is no per-port or per-region cache-vs-recompute cost decision in MVP. + +This includes mixed-output operators: if one required output port is cache miss and another required output port is cache hit, the operator may execute, but consumers of the cache-hit output must still bind to the cached URI. + +### 5. Shallow State Hierarchy for Cached Regions ToSkip regions create lightweight state structures (Workflow → Region → Operator/Link) and store cached metrics at the operator level. No Worker/Channel states are created, so `numWorkers=0` and no worker assignments are emitted. -### 5. Stats Emission via Direct Client Updates +### 6. Stats Emission via Direct Client Updates Cached regions emit synthetic `ExecutionStatsUpdate` messages directly via `asyncRPCClient.sendToClient()`, with cached operator metrics (`numWorkers=0`). This reuses existing stats infrastructure without special-casing the frontend. -### 6. Explicit Cache State in Metrics +### 7. Explicit Cache State in Metrics Cached operators use a dedicated `COMPLETED_FROM_CACHE` state in `WorkflowAggregatedState` protobuf enum. This provides clear visual feedback to users and distinguishes cache-hit completion from normal execution completion. -### 7. Deferred Lifecycle Management +### 8. Deferred Lifecycle Management V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research topics for future work, not implementation blockers. @@ -99,26 +109,33 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research - Physical plan (immutable) - `cachedOutputs` (Δ): map of cache hits from step 1 -- Visible ports (☐): ports that must be materialized for user visibility - -**Region Classification**: - -- **Homogeneity constraint**: A region is either fully cached (ToSkip) or fully executed (ToExecute) — no mixing within a region -- **ToExecute regions**: Contain visible ports without cache hits OR depend on uncached intermediate materializations -- **ToSkip regions**: All required output ports have cache hits AND no visible ports need fresh computation - -**Cost Model** (currently simple, needs refinement for research): - -- Cached regions: cost = 0 -- Executing regions: cost = (# operators × DEFAULT_OPERATOR_COST) + materialization read/write costs -- Cache read/write: small fixed costs (0.5 per port) -- **Future**: Historical stats-based estimation from `runtime_statistics` table +- Required output ports (first-class): sink output ports + visible intermediate output ports (`outputPortsNeedingStorage`) + +**MVP Scheduling Flow (Assumption III)**: + +1. **Deterministic requiredness propagation** (port/edge aware): + - Seed required outports with required sink + visible ports. + - For each operator, if any required outport is cache miss, mark operator `Execute`; otherwise `Skip`. + - For each `Execute` operator, mark all upstream outports on incoming edges as required. + - Iterate to fixed point. +2. **Classify inputs of executing operators**: + - `Cache-fed` if upstream required outport has cache hit. + - `Fresh-required` if upstream required outport has cache miss. +3. **Residual Pasta optimization**: + - Build residual planning structure containing only `Execute` operators and `Fresh-required` dependencies. + - Run original Pasta search/regioning/materialization on this residual structure. + - Keep blocking-edge constraints within the residual scope. +4. **Assemble final full schedule**: + - Include residual execute regions (`ToExecute`) and skipped regions (`ToSkip`) in one schedule. + - Regions remain homogeneous (`cached=true/false`). + - All URI bindings are fixed at planning time. + - Cache-hit required outputs are marked as reuse-only and are never re-materialized during execution. **Output**: -- Schedule with regions marked `cached=true/false` -- Port configs updated with cached URIs for ToSkip regions -- Cached tuple counts reused in port metadata +- Single schedule over the full workflow with both `ToSkip` and `ToExecute` regions +- Planning-time URI bindings for cache-fed inputs and fresh-required outputs +- Reuse-only output semantics for cache-hit required ports to suppress rematerialization ### 3. Runtime Execution @@ -142,7 +159,7 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached - Input/output tuple counts from cached metadata - **UI**: The graph view displays `-` for cached input counts and for cached output ports that were not materialized; worker counts show `from cache`. - **Note**: Cached stats are synthetic (inputs default to 0; non-materialized outputs may be omitted). Do not use them for cost modeling until we add explicit tagging/filtering in `runtime_statistics`. -5. **Propagate cached URIs**: Downstream operators receive cached `result_uri` for materialized inputs +5. **Planning-time URI bindings**: Downstream operators consume URIs bound during scheduling; no runtime cache-vs-fresh URI decisions 6. **No WorkerAssignmentUpdate**: Cached regions don't send worker assignment events (consistent with numWorkers=0) 7. **Set phase to Completed**: Region lifecycle completes immediately @@ -169,6 +186,11 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached **Architecture note**: Event-based communication follows existing controller pattern - handler emits events via `sendToClient()`, service layer registers callbacks to handle them. Clean separation: engine layer knows nothing about web/service layer. +**Forced-cache nuance**: + +- If an operator executes due to one required output miss, other required outputs on the same operator that are cache hits are reuse-only. +- Those cache-hit outputs are not freshly materialized in this run, and consumers remain bound to cached URIs. + ### 4. Client-Side State Management **Location**: `ExecutionStatsService`, `ExecutionStateStore` @@ -325,7 +347,7 @@ Phase 1.1 Service/DAO architecture is complete. Key components: ### 7. Testing Strategy -- **Unit tests**: Fingerprint determinism, cost model logic, region classification +- **Unit tests**: Fingerprint determinism, forced-cache propagation logic, region classification - **Integration tests**: Cache upsert → DB verification, cache lookup → region marking - **E2E tests**: Run workflow → populate cache → re-run → verify ToSkip behavior and result correctness - **Change detection**: Modify operator params → verify fingerprint mismatch → cache miss @@ -368,11 +390,20 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs for deterministic subDAG hashing - **Submission-time lookup**: `WorkflowExecutionService` uses `OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` - **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event → `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → `OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count) -- **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached when all required outputs have hits, reuses cached URIs in port configs +- **Scheduler integration**: + - Implemented behavior (current branch): deterministic requiredness propagation + residual Pasta + full schedule assembly with skip regions excluded from Pasta planning scope + - Planning bindings are prepared before Pasta and consumed during `createRegions` in search: + - fresh-required outputs receive fresh URIs, + - cache-hit required outputs are bound as reuse-only (`materialize=false`), + - cache-fed inputs are pre-bound to cached URIs. + - No post-search region re-annotation step; resource allocation remains in Pasta search (`allocateResourcesAndEvaluateCost`) and final execute-region input configs stay as `InputPortConfig`. - **Runtime execution**: `RegionExecutionCoordinator` branches on `region.cached` flag: - ToSkip regions: `completeCachedRegion()` records cached operator metrics (numWorkers=0, processingTime=0) without creating workers, propagates cached URIs downstream - ToExecute regions: normal execution path - **Stats emission**: Cached regions emit `ExecutionStatsUpdate` via direct client updates, maintaining consistency with normal execution lifecycle +- **Controller stats query robustness**: + - `QueryWorkerStatisticsHandler` now uses optional operator-execution lookup and skips operators not yet initialized. + - This avoids `None.get` during global stats polling when schedule levels are launched incrementally (including skip-first ordering where some execute regions are still pending initialization). - **Frontend cache visualization** (Phase 1.2 - Complete): - Added `COMPLETED_FROM_CACHE` state to `WorkflowAggregatedState` protobuf enum - Added `CompletedFromCache` phase to `RegionExecutionCoordinator` for cached region lifecycle @@ -400,14 +431,14 @@ ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCache The cache system integrates with three layers: 1. **Execution Planning Layer**: Cache lookup at workflow submission, fingerprint computation -2. **Scheduler Layer (Pasta)**: Cost-based cache-or-recompute decisions, region classification -3. **Runtime Layer**: Short-circuit execution for cached regions, state management, stats emission +2. **Scheduler Layer (Pasta)**: Deterministic forced-cache propagation + residual Pasta optimization + full schedule assembly +3. **Runtime Layer**: Short-circuit execution for cached regions, planning-time URI binding consumption, stats emission ## Research Contributions -### 1. Cost Model & Pruning (Primary Contribution) +### 1. Cost Model & Pruning (Future Contribution) -**Goal**: Determine when caching is beneficial and what outputs to cache. +**Goal**: Determine when caching is beneficial and what outputs to cache, beyond MVP Assumption III. **Components**: @@ -417,9 +448,9 @@ The cache system integrates with three layers: - Skip caching small outputs (recompute is cheap) - Skip terminal operators (no downstream reuse) - Skip low-reuse-probability operators -- **Cache-or-recompute decision**: Per-region cost comparison (cache read vs recompute) +- **Cache-or-recompute decision**: Per-region or per-port comparison (cache read vs recompute) -**Current status**: Simple cost model (cached=0, execute>0). Need to develop historical stats-based estimation. +**Current MVP status**: Disabled for merge target. MVP uses forced cache use when required output ports have cache hits. **Data source**: `runtime_statistics` table already captures execution metrics (data/control processing time, tuple counts, worker counts per operator).
