This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-resource-allocator-in-cost-estimator in repository https://gitbox.apache.org/repos/asf/texera.git
commit 8bd36d680a02705341d0607220c8f6194e723da1 Author: Xiao-zhen-Liu <[email protected]> AuthorDate: Tue Aug 19 19:57:03 2025 -0700 refactoring to address comments. --- .../scheduling/CostBasedScheduleGenerator.scala | 6 ++++-- .../architecture/scheduling/CostEstimator.scala | 20 ++++++++------------ .../resourcePolicies/ResourceAllocator.scala | 10 +++++----- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index 1fa013049b..2d26c3fed5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -606,9 +606,11 @@ class CostBasedScheduleGenerator( .map(level => level .map(region => { - val (newRegion, regionCost) = costEstimator.allocateResourcesAndEstimateCost(region, 1) + val (resourceConfig, regionCost) = + costEstimator.allocateResourcesAndEstimateCost(region, 1) // Update the region in the regionDAG to be the new region with resources allocated. - replaceVertex(regionDAG, region, newRegion) + val regionWithResourceConfig = region.copy(resourceConfig = Some(resourceConfig)) + replaceVertex(regionDAG, region, regionWithResourceConfig) regionCost }) .sum diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index 0e51328094..78964f57bb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -24,13 +24,12 @@ import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST -import edu.uci.ics.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex +import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator import edu.uci.ics.amber.engine.common.AmberLogging import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, WORKFLOW_VERSION} -import org.jgrapht.graph.DirectedAcyclicGraph import java.net.URI import scala.util.{Failure, Success, Try} @@ -45,9 +44,9 @@ trait CostEstimator { * * Note currently the ResourceAllocator is not cost-based and thus we use a cost model that does not rely on the * allocator, i.e., the cost estimation process is external to the ResourceAllocator. - * @return An updated region with allocated resources and an estimated cost of this region. + * @return A ResourceConfig for the region and an estimated cost of this region. */ - def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int): (Region, Double) + def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int): (ResourceConfig, Double) } object DefaultCostEstimator { @@ -88,9 +87,9 @@ class DefaultCostEstimator( override def allocateResourcesAndEstimateCost( region: Region, resourceUnits: Int - ): (Region, Double) = { + ): (ResourceConfig, Double) = { // Currently the dummy cost from resourceAllocator is discarded. - val (newRegion, _) = resourceAllocator.allocate(region) + val (resourceConfig, _) = resourceAllocator.allocate(region) // We use a cost model that does not rely on the resource allocation. // TODO: Once the ResourceAllocator actually calculates a cost, we can use its calculated cost. val cost = this.operatorEstimatedTimeOption match { @@ -99,7 +98,7 @@ class DefaultCostEstimator( // operator in each region to represent the region's execution time, and use the sum of all the regions' // execution time as the wall-clock runtime of the workflow. // This assumes a schedule is a total-order of the regions. - val opExecutionTimes = newRegion.getOperators.map(op => { + val opExecutionTimes = region.getOperators.map(op => { operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, DEFAULT_OPERATOR_COST) }) val longestRunningOpExecutionTime = opExecutionTimes.max @@ -108,12 +107,9 @@ class DefaultCostEstimator( // Without past statistics (e.g., first execution), we use number of ports needing storage as the cost. // Each port needing storage has a portConfig. // This is independent of the schedule / resource allocator. - newRegion.resourceConfig match { - case Some(config) => config.portConfigs.size - case None => 0 - } + resourceConfig.portConfigs.size } - (newRegion, cost) + (resourceConfig, cost) } /** diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala index 7a7f9add58..39f3f5b9ac 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala @@ -33,7 +33,7 @@ import java.net.URI import scala.collection.mutable trait ResourceAllocator { - def allocate(region: Region): (Region, Double) + def allocate(region: Region): (ResourceConfig, Double) } class DefaultResourceAllocator( @@ -58,14 +58,14 @@ class DefaultResourceAllocator( * * @param region The region for which to allocate resources. * @return A tuple containing: - * 1) A new Region instance with new resource configuration. - * 2) An estimated cost of the workflow with the new resource configuration, + * 1) A resource configuration. + * 2) An estimated cost of the workflow with the resource configuration, * represented as a Double value (currently set to 0, but will be * updated in the future). */ def allocate( region: Region - ): (Region, Double) = { + ): (ResourceConfig, Double) = { val opToOperatorConfigMapping = region.getOperators .map(physicalOp => physicalOp.id -> OperatorConfig(generateWorkerConfigs(physicalOp))) @@ -137,7 +137,7 @@ class DefaultResourceAllocator( portConfigs ) - (region.copy(resourceConfig = Some(resourceConfig)), 0) + (resourceConfig, 0) } /**
