This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit bdaa01f8e22d2cdbc81c79f8c21326a97f036daf Author: Xiaozhen Liu <[email protected]> AuthorDate: Fri Dec 5 11:30:38 2025 -0800 feat(cache): update cost model temporarily --- .../scheduling/CostBasedScheduleGenerator.scala | 6 +- .../architecture/scheduling/CostEstimator.scala | 118 ++++----------------- 2 files changed, 23 insertions(+), 101 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index cb517d4125..5d9233c035 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -388,8 +388,8 @@ class CostBasedScheduleGenerator( */ def bottomUpSearch( globalSearch: Boolean = false, - oChains: Boolean = true, - oCleanEdges: Boolean = true, + oChains: Boolean = false, + oCleanEdges: Boolean = false, oEarlyStop: Boolean = true ): SearchResult = { val startTime = System.nanoTime() @@ -528,7 +528,7 @@ class CostBasedScheduleGenerator( */ def topDownSearch( globalSearch: Boolean = false, - oChains: Boolean = true, + oChains: Boolean = false, oCleanEdges: Boolean = true ): SearchResult = { val startTime = System.nanoTime() diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala index e14a9f7f6f..2fe09e815d 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala @@ -19,20 +19,17 @@ package org.apache.amber.engine.architecture.scheduling -import org.apache.amber.core.storage.DocumentFactory -import org.apache.amber.core.tuple.Tuple import org.apache.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.amber.core.workflow.WorkflowContext import org.apache.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST -import org.apache.amber.engine.architecture.scheduling.config.ResourceConfig +import org.apache.amber.engine.architecture.scheduling.config.{ + InputPortConfig, + IntermediateInputPortConfig, + OutputPortConfig, + ResourceConfig +} import org.apache.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator import org.apache.amber.engine.common.AmberLogging -import org.apache.texera.dao.SqlServer -import org.apache.texera.dao.SqlServer.withTransaction -import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, WORKFLOW_VERSION} - -import java.net.URI -import scala.util.{Failure, Success, Try} /** * A cost estimator should estimate a cost of running a region under the given resource constraints as units. @@ -65,101 +62,26 @@ class DefaultCostEstimator( ) extends CostEstimator with AmberLogging { - // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. - private val operatorEstimatedTimeOption = Try( - this.getOperatorExecutionTimeInSeconds( - this.workflowContext.workflowId.id - ) - ) match { - case Failure(_) => None - case Success(result) => result - } - - operatorEstimatedTimeOption match { - case None => - logger.info( - s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, " + - s"no past execution statistics available. Using number of materialized output ports as the cost. " - ) - case Some(_) => - } - override def allocateResourcesAndEstimateCost( region: Region, resourceUnits: Int ): (ResourceConfig, Double) = { // Currently the dummy cost from resourceAllocator is discarded. val (resourceConfig, _) = resourceAllocator.allocate(region) - // We use a cost model that does not rely on the resource allocation. - // TODO: Once the ResourceAllocator actually calculates a cost, we can use its calculated cost. - val cost = this.operatorEstimatedTimeOption match { - case Some(operatorEstimatedTime) => - // Use past statistics (wall-clock runtime). We use the execution time of the longest-running - // operator in each region to represent the region's execution time, and use the sum of all the regions' - // execution time as the wall-clock runtime of the workflow. - // This assumes a schedule is a total-order of the regions. - val opExecutionTimes = region.getOperators.map(op => { - operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, DEFAULT_OPERATOR_COST) - }) - val longestRunningOpExecutionTime = opExecutionTimes.max - longestRunningOpExecutionTime - case None => - // Without past statistics (e.g., first execution), we use number of ports needing storage as the cost. - // Each port needing storage has a portConfig. - // This is independent of the schedule / resource allocator. - resourceConfig.portConfigs.size - } + val cost = + if (region.cached) { + 0.0 + } else { + val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST + val writeCost = resourceConfig.portConfigs.values.collect { case _: OutputPortConfig => + 0.5 + }.sum + val readCost = resourceConfig.portConfigs.values.collect { + case _: InputPortConfig => 0.5 + case _: IntermediateInputPortConfig => 0.5 + }.sum + opCost + writeCost + readCost + } (resourceConfig, cost) } - - /** - * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. - * Using the total control processing time plus data processing time of an operator as its cost. - * If no past statistics are available (e.g., first execution), return None. - */ - private def getOperatorExecutionTimeInSeconds( - wid: Long - ): Option[Map[String, Double]] = { - - val uriString: String = withTransaction( - SqlServer - .getInstance() - .createDSLContext() - ) { context => - context - .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) - .from(WORKFLOW_EXECUTIONS) - .join(WORKFLOW_VERSION) - .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) - .where( - WORKFLOW_VERSION.WID - .eq(wid.toInt) - .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) - ) - .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) - .limit(1) - .fetchOneInto(classOf[String]) - } - - if (uriString == null || uriString.isEmpty) { - None - } else { - val uri: URI = new URI(uriString) - val document = DocumentFactory.openDocument(uri) - - val maxStats = document._1 - .get() - .foldLeft(Map.empty[String, Double]) { (acc, tuple) => - val record = tuple.asInstanceOf[Tuple] - val operatorId = record.getField(0).asInstanceOf[String] - val dataProcessingTime = record.getField(6).asInstanceOf[Long] - val controlProcessingTime = record.getField(7).asInstanceOf[Long] - val currentMaxTime = acc.getOrElse(operatorId, 0.0) - val newTime = (dataProcessingTime + controlProcessingTime) / 1e9 - acc + (operatorId -> Math.max(currentMaxTime, newTime)) - } - - if (maxStats.isEmpty) None else Some(maxStats) - } - } }
