This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new c7fcd9f427 refactor(amber): move resource allocator into cost 
estimator (#3550)
c7fcd9f427 is described below

commit c7fcd9f4278f850e3c1456866ba6f25f85a8f489
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Wed Aug 20 18:25:08 2025 -0700

    refactor(amber): move resource allocator into cost estimator (#3550)
    
    For the cost-based scheduler in Amber, ideally, the ResourceAllocator
    should be part of the CostEstimator. However for historical reasons and
    the fact that the current ResourceAllocator is not cost-based at all, we
    have been separating these two modules.
    
    To make the modules clearer and to make it easier for future updates to
    the ResourceAllocator, this PR merges the ResourceAllocator as part of
    CostEstimator.
    
    Note as the ResourceAllocator does not produce a real cost currently,
    the CostEstimator still uses a cost model that does not rely on the
    resource allocator. In the future when the ResourceAllocator does
    produce a real cost, we can further consolidate these two modules.
    
    ### Code Changes
    - `ResourceAllocator` is now inside `CostEstimator` for
    `CostBasedScheduleGenerator`
    - `allocateResources()` is now executed as part of the search process
    for `CostBasedScheduleGenerator` (previously it was done after the
    search), and is merged with `estimate()` to be
    `allocateResourcesAndEvaluateCost`
    - Moved some methods only used by `ExpansionGreedyScheduleGenerator`
    from the base class into that class.
---
 .../scheduling/CostBasedScheduleGenerator.scala    | 59 ++++++++--------
 .../architecture/scheduling/CostEstimator.scala    | 30 ++++++--
 .../ExpansionGreedyScheduleGenerator.scala         | 71 ++++++++++++++++++-
 .../scheduling/ScheduleGenerator.scala             | 79 ++--------------------
 .../architecture/scheduling/SchedulingUtils.scala  | 57 ++++++++++++++++
 .../resourcePolicies/ResourceAllocator.scala       | 10 +--
 .../scheduling/DefaultCostEstimatorSpec.scala      | 35 ++++++++--
 7 files changed, 223 insertions(+), 118 deletions(-)

diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 59ef4deaf8..2d26c3fed5 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -29,6 +29,7 @@ import edu.uci.ics.amber.core.workflow.{
   PhysicalPlan,
   WorkflowContext
 }
+import 
edu.uci.ics.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
 import edu.uci.ics.amber.engine.architecture.scheduling.config.{
   IntermediateInputPortConfig,
   OutputPortConfig,
@@ -68,7 +69,11 @@ class CostBasedScheduleGenerator(
   )
 
   private val costEstimator =
-    new DefaultCostEstimator(workflowContext = workflowContext, actorId = 
actorId)
+    new DefaultCostEstimator(
+      workflowContext = workflowContext,
+      resourceAllocator = resourceAllocator,
+      actorId = actorId
+    )
 
   def generate(): (Schedule, PhysicalPlan) = {
     val startTime = System.nanoTime()
@@ -323,7 +328,6 @@ class CostBasedScheduleGenerator(
     )
 
     val regionDAG = searchResult.regionDAG
-    allocateResource(regionDAG)
     regionDAG
   }
 
@@ -397,10 +401,7 @@ class CostBasedScheduleGenerator(
       def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region, 
RegionLink]): Unit = {
         if (oEarlyStop) schedulableStates.add(currentState)
         // Calculate the current state's cost and update the bestResult if 
it's lower
-        val cost =
-          evaluate(
-            RegionPlan(regionDAG.vertexSet().asScala.toSet, 
regionDAG.edgeSet().asScala.toSet)
-          )
+        val cost = allocateResourcesAndEvaluateCost(regionDAG)
         if (cost < bestResult.cost) {
           bestResult = SearchResult(currentState, regionDAG, cost)
         }
@@ -453,12 +454,7 @@ class CostBasedScheduleGenerator(
                 physicalPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
-                  evaluate(
-                    RegionPlan(
-                      regionDAG.vertexSet().asScala.toSet,
-                      regionDAG.edgeSet().asScala.toSet
-                    )
-                  )
+                  allocateResourcesAndEvaluateCost(regionDAG)
                 case Right(_) =>
                   Double.MaxValue
               }
@@ -544,10 +540,7 @@ class CostBasedScheduleGenerator(
         */
       def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region, 
RegionLink]): Unit = {
         // Calculate the current state's cost and update the bestResult if 
it's lower
-        val cost =
-          evaluate(
-            RegionPlan(regionDAG.vertexSet().asScala.toSet, 
regionDAG.edgeSet().asScala.toSet)
-          )
+        val cost = allocateResourcesAndEvaluateCost(regionDAG)
         if (cost < bestResult.cost) {
           bestResult = SearchResult(currentState, regionDAG, cost)
         }
@@ -577,12 +570,7 @@ class CostBasedScheduleGenerator(
                 physicalPlan.getBlockingAndDependeeLinks ++ neighborState
               ) match {
                 case Left(regionDAG) =>
-                  evaluate(
-                    RegionPlan(
-                      regionDAG.vertexSet().asScala.toSet,
-                      regionDAG.edgeSet().asScala.toSet
-                    )
-                  )
+                  allocateResourcesAndEvaluateCost(regionDAG)
                 case Right(_) =>
                   Double.MaxValue
               }
@@ -601,16 +589,33 @@ class CostBasedScheduleGenerator(
   }
 
   /**
-    * The cost function used by the search. Takes a region plan, generates one 
or more (to be done in the future)
-    * schedules based on the region plan, and calculates the cost of the 
schedule(s) using Cost Estimator. Uses the cost
-    * of the best schedule (currently only considers one schedule) as the cost 
of the region plan.
+    * Takes a region DAG, generates one or more (to be done in the future) 
schedules based on the region DAG, allocates
+    * resources to each region in the region DAG, and calculates the cost of 
the schedule(s) using Cost Estimator. Uses
+    * the cost of the best schedule (currently only considers one schedule) as 
the cost of the region DAG.
     *
     * @return A cost determined by the cost estimator.
     */
-  private def evaluate(regionPlan: RegionPlan): Double = {
+  private def allocateResourcesAndEvaluateCost(
+      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+  ): Double = {
+    val regionPlan =
+      RegionPlan(regionDAG.vertexSet().asScala.toSet, 
regionDAG.edgeSet().asScala.toSet)
     val schedule = generateScheduleFromRegionPlan(regionPlan)
     // In the future we may allow multiple regions in a level and split the 
resources.
-    schedule.map(level => level.map(region => costEstimator.estimate(region, 
1)).sum).sum
+    schedule
+      .map(level =>
+        level
+          .map(region => {
+            val (resourceConfig, regionCost) =
+              costEstimator.allocateResourcesAndEstimateCost(region, 1)
+            // Update the region in the regionDAG to be the new region with 
resources allocated.
+            val regionWithResourceConfig = region.copy(resourceConfig = 
Some(resourceConfig))
+            replaceVertex(regionDAG, region, regionWithResourceConfig)
+            regionCost
+          })
+          .sum
+      )
+      .sum
   }
 
 }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
index 6e7d5d8b85..78964f57bb 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -24,6 +24,8 @@ import edu.uci.ics.amber.core.tuple.Tuple
 import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
 import edu.uci.ics.amber.core.workflow.WorkflowContext
 import 
edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
+import edu.uci.ics.amber.engine.architecture.scheduling.config.ResourceConfig
+import 
edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator
 import edu.uci.ics.amber.engine.common.AmberLogging
 import edu.uci.ics.texera.dao.SqlServer
 import edu.uci.ics.texera.dao.SqlServer.withTransaction
@@ -36,7 +38,15 @@ import scala.util.{Failure, Success, Try}
   * A cost estimator should estimate a cost of running a region under the 
given resource constraints as units.
   */
 trait CostEstimator {
-  def estimate(region: Region, resourceUnits: Int): Double
+
+  /**
+    * Uses the given resource units to allocate resources to the region, and 
determine a cost based on the allocation.
+    *
+    * Note currently the ResourceAllocator is not cost-based and thus we use a 
cost model that does not rely on the
+    * allocator, i.e., the cost estimation process is external to the 
ResourceAllocator.
+    * @return A ResourceConfig for the region and an estimated cost of this 
region.
+    */
+  def allocateResourcesAndEstimateCost(region: Region, resourceUnits: Int): 
(ResourceConfig, Double)
 }
 
 object DefaultCostEstimator {
@@ -50,6 +60,7 @@ object DefaultCostEstimator {
   */
 class DefaultCostEstimator(
     workflowContext: WorkflowContext,
+    val resourceAllocator: ResourceAllocator,
     val actorId: ActorVirtualIdentity
 ) extends CostEstimator
     with AmberLogging {
@@ -73,8 +84,15 @@ class DefaultCostEstimator(
     case Some(_) =>
   }
 
-  override def estimate(region: Region, resourceUnits: Int): Double = {
-    this.operatorEstimatedTimeOption match {
+  override def allocateResourcesAndEstimateCost(
+      region: Region,
+      resourceUnits: Int
+  ): (ResourceConfig, Double) = {
+    // Currently the dummy cost from resourceAllocator is discarded.
+    val (resourceConfig, _) = resourceAllocator.allocate(region)
+    // We use a cost model that does not rely on the resource allocation.
+    // TODO: Once the ResourceAllocator actually calculates a cost, we can use 
its calculated cost.
+    val cost = this.operatorEstimatedTimeOption match {
       case Some(operatorEstimatedTime) =>
         // Use past statistics (wall-clock runtime). We use the execution time 
of the longest-running
         // operator in each region to represent the region's execution time, 
and use the sum of all the regions'
@@ -89,11 +107,9 @@ class DefaultCostEstimator(
         // Without past statistics (e.g., first execution), we use number of 
ports needing storage as the cost.
         // Each port needing storage has a portConfig.
         // This is independent of the schedule / resource allocator.
-        region.resourceConfig match {
-          case Some(config) => config.portConfigs.size
-          case None         => 0
-        }
+        resourceConfig.portConfigs.size
     }
+    (resourceConfig, cost)
   }
 
   /**
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 9db1a8988d..8cbc7075bf 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -29,7 +29,7 @@ import edu.uci.ics.amber.core.workflow.{
   PhysicalPlan,
   WorkflowContext
 }
-import 
edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
+import SchedulingUtils.replaceVertex
 import edu.uci.ics.amber.engine.architecture.scheduling.config.{
   IntermediateInputPortConfig,
   OutputPortConfig,
@@ -37,11 +37,12 @@ import 
edu.uci.ics.amber.engine.architecture.scheduling.config.{
 }
 import org.jgrapht.alg.connectivity.BiconnectivityInspector
 import org.jgrapht.graph.DirectedAcyclicGraph
+import org.jgrapht.traverse.TopologicalOrderIterator
 
 import java.net.URI
 import scala.annotation.tailrec
 import scala.collection.mutable
-import scala.jdk.CollectionConverters.CollectionHasAsScala
+import scala.jdk.CollectionConverters.{CollectionHasAsScala, 
IteratorHasAsScala}
 
 @deprecated(
   "This greedy schedule generator will be removed in the future. Use 
CostBasedScheduleGenerator instead."
@@ -361,6 +362,72 @@ class ExpansionGreedyScheduleGenerator(
     newPhysicalPlan
   }
 
+  private def allocateResource(
+      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+  ): Unit = {
+    // generate the resource configs
+    new TopologicalOrderIterator(regionDAG).asScala
+      .foreach(region => {
+        val (resourceConfig, _) = resourceAllocator.allocate(region)
+        val regionWithResourceConfig = region.copy(resourceConfig = 
Some(resourceConfig))
+        replaceVertex(regionDAG, region, regionWithResourceConfig)
+      })
+  }
+
+  private def getRegions(
+      physicalOpId: PhysicalOpIdentity,
+      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+  ): Set[Region] = {
+    regionDAG
+      .vertexSet()
+      .asScala
+      .filter(region => region.getOperators.map(_.id).contains(physicalOpId))
+      .toSet
+  }
+
+  /**
+    * For a dependee input link, although it connects two regions A->B, we 
include this link and its toOp in region A
+    * so that the dependee link will be completed first.
+    */
+  private def populateDependeeLinks(
+      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
+  ): Unit = {
+
+    val dependeeLinks = physicalPlan
+      .topologicalIterator()
+      .flatMap { physicalOpId =>
+        val upstreamPhysicalOpIds = 
physicalPlan.getUpstreamPhysicalOpIds(physicalOpId)
+        upstreamPhysicalOpIds.flatMap { upstreamPhysicalOpId =>
+          physicalPlan
+            .getLinksBetween(upstreamPhysicalOpId, physicalOpId)
+            .filter(link =>
+              physicalPlan
+                .getOperator(physicalOpId)
+                .isInputLinkDependee(link)
+            )
+        }
+      }
+      .toSet
+
+    dependeeLinks
+      .flatMap { link => getRegions(link.fromOpId, regionDAG).map(region => 
region -> link) }
+      .groupBy(_._1)
+      .view
+      .mapValues(_.map(_._2))
+      .foreach {
+        case (region, links) =>
+          val newRegion = region.copy(
+            physicalLinks = region.physicalLinks ++ links,
+            physicalOps =
+              region.getOperators ++ links.map(_.toOpId).map(id => 
physicalPlan.getOperator(id)),
+            ports = region.getPorts ++ links.map(dependeeLink =>
+              GlobalPortIdentity(dependeeLink.toOpId, dependeeLink.toPortId, 
input = true)
+            )
+          )
+          replaceVertex(regionDAG, region, newRegion)
+      }
+  }
+
   /**
     * This function creates and connects a region DAG while conducting 
materialization replacement.
     * It keeps attempting to create a region DAG from the given PhysicalPlan. 
When failed, a list
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
index ad6187104d..41405ebf3a 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala
@@ -22,7 +22,6 @@ package edu.uci.ics.amber.engine.architecture.scheduling
 import edu.uci.ics.amber.config.ApplicationConfig
 import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
 import edu.uci.ics.amber.core.workflow._
-import 
edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
 import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
   DefaultResourceAllocator,
   ExecutionClusterInfo
@@ -69,6 +68,12 @@ abstract class ScheduleGenerator(
     var physicalPlan: PhysicalPlan
 ) {
   private val executionClusterInfo = new ExecutionClusterInfo()
+  val resourceAllocator =
+    new DefaultResourceAllocator(
+      physicalPlan,
+      executionClusterInfo,
+      workflowContext.workflowSettings
+    )
 
   def generate(): (Schedule, PhysicalPlan)
 
@@ -121,76 +126,4 @@ abstract class ScheduleGenerator(
     }.toMap
     Schedule(levelSets)
   }
-
-  def allocateResource(
-      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
-  ): Unit = {
-
-    val resourceAllocator =
-      new DefaultResourceAllocator(
-        physicalPlan,
-        executionClusterInfo,
-        workflowContext.workflowSettings
-      )
-    // generate the resource configs
-    new TopologicalOrderIterator(regionDAG).asScala
-      .foreach(region => {
-        val (newRegion, _) = resourceAllocator.allocate(region)
-        replaceVertex(regionDAG, region, newRegion)
-      })
-  }
-
-  def getRegions(
-      physicalOpId: PhysicalOpIdentity,
-      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
-  ): Set[Region] = {
-    regionDAG
-      .vertexSet()
-      .asScala
-      .filter(region => region.getOperators.map(_.id).contains(physicalOpId))
-      .toSet
-  }
-
-  /**
-    * For a dependee input link, although it connects two regions A->B, we 
include this link and its toOp in region A
-    * so that the dependee link will be completed first.
-    */
-  def populateDependeeLinks(
-      regionDAG: DirectedAcyclicGraph[Region, RegionLink]
-  ): Unit = {
-
-    val dependeeLinks = physicalPlan
-      .topologicalIterator()
-      .flatMap { physicalOpId =>
-        val upstreamPhysicalOpIds = 
physicalPlan.getUpstreamPhysicalOpIds(physicalOpId)
-        upstreamPhysicalOpIds.flatMap { upstreamPhysicalOpId =>
-          physicalPlan
-            .getLinksBetween(upstreamPhysicalOpId, physicalOpId)
-            .filter(link =>
-              physicalPlan
-                .getOperator(physicalOpId)
-                .isInputLinkDependee(link)
-            )
-        }
-      }
-      .toSet
-
-    dependeeLinks
-      .flatMap { link => getRegions(link.fromOpId, regionDAG).map(region => 
region -> link) }
-      .groupBy(_._1)
-      .view
-      .mapValues(_.map(_._2))
-      .foreach {
-        case (region, links) =>
-          val newRegion = region.copy(
-            physicalLinks = region.physicalLinks ++ links,
-            physicalOps =
-              region.getOperators ++ links.map(_.toOpId).map(id => 
physicalPlan.getOperator(id)),
-            ports = region.getPorts ++ links.map(dependeeLink =>
-              GlobalPortIdentity(dependeeLink.toOpId, dependeeLink.toPortId, 
input = true)
-            )
-          )
-          replaceVertex(regionDAG, region, newRegion)
-      }
-  }
 }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
new file mode 100644
index 0000000000..46ba8580b1
--- /dev/null
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/SchedulingUtils.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.engine.architecture.scheduling
+
+import org.jgrapht.graph.DirectedAcyclicGraph
+
+import scala.jdk.CollectionConverters.CollectionHasAsScala
+
+object SchedulingUtils {
+
+  def replaceVertex(
+      graph: DirectedAcyclicGraph[Region, RegionLink],
+      oldVertex: Region,
+      newVertex: Region
+  ): Unit = {
+    if (oldVertex.equals(newVertex)) {
+      return
+    }
+    graph.addVertex(newVertex)
+    graph
+      .outgoingEdgesOf(oldVertex)
+      .asScala
+      .toList
+      .foreach(oldEdge => {
+        val dest = graph.getEdgeTarget(oldEdge)
+        graph.removeEdge(oldEdge)
+        graph.addEdge(newVertex, dest, RegionLink(newVertex.id, dest.id))
+      })
+    graph
+      .incomingEdgesOf(oldVertex)
+      .asScala
+      .toList
+      .foreach(oldEdge => {
+        val source = graph.getEdgeSource(oldEdge)
+        graph.removeEdge(oldEdge)
+        graph.addEdge(source, newVertex, RegionLink(source.id, newVertex.id))
+      })
+    graph.removeVertex(oldVertex)
+  }
+}
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
index 7a7f9add58..39f3f5b9ac 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala
@@ -33,7 +33,7 @@ import java.net.URI
 import scala.collection.mutable
 
 trait ResourceAllocator {
-  def allocate(region: Region): (Region, Double)
+  def allocate(region: Region): (ResourceConfig, Double)
 }
 
 class DefaultResourceAllocator(
@@ -58,14 +58,14 @@ class DefaultResourceAllocator(
     *
     * @param region The region for which to allocate resources.
     * @return A tuple containing:
-    *         1) A new Region instance with new resource configuration.
-    *         2) An estimated cost of the workflow with the new resource 
configuration,
+    *         1) A resource configuration.
+    *         2) An estimated cost of the workflow with the resource 
configuration,
     *         represented as a Double value (currently set to 0, but will be
     *         updated in the future).
     */
   def allocate(
       region: Region
-  ): (Region, Double) = {
+  ): (ResourceConfig, Double) = {
 
     val opToOperatorConfigMapping = region.getOperators
       .map(physicalOp => physicalOp.id -> 
OperatorConfig(generateWorkerConfigs(physicalOp)))
@@ -137,7 +137,7 @@ class DefaultResourceAllocator(
       portConfigs
     )
 
-    (region.copy(resourceConfig = Some(resourceConfig)), 0)
+    (resourceConfig, 0)
   }
 
   /**
diff --git 
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
 
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
index c4fb8408e6..1700c26e9f 100644
--- 
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
+++ 
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala
@@ -25,6 +25,10 @@ import edu.uci.ics.amber.core.storage.{DocumentFactory, 
VFSURIFactory}
 import edu.uci.ics.amber.core.tuple.Tuple
 import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
 import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, 
WorkflowContext}
+import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
+  DefaultResourceAllocator,
+  ExecutionClusterInfo
+}
 import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER
 import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow
 import edu.uci.ics.amber.operator.TestOperators
@@ -128,9 +132,16 @@ class DefaultCostEstimatorSpec
       ),
       new WorkflowContext()
     )
+    val resourceAllocator =
+      new DefaultResourceAllocator(
+        workflow.physicalPlan,
+        new ExecutionClusterInfo(),
+        workflow.context.workflowSettings
+      )
 
     val costEstimator = new DefaultCostEstimator(
       workflow.context,
+      resourceAllocator,
       CONTROLLER
     )
     val ports = workflow.physicalPlan.operators.flatMap(op =>
@@ -148,7 +159,7 @@ class DefaultCostEstimatorSpec
       ports = ports
     )
 
-    val costOfRegion = costEstimator.estimate(region, 1)
+    val (_, costOfRegion) = 
costEstimator.allocateResourcesAndEstimateCost(region, 1)
 
     assert(costOfRegion == 0)
   }
@@ -215,8 +226,16 @@ class DefaultCostEstimatorSpec
     writer.putOne(keywordOpRuntimeStatistics)
     writer.close()
 
+    val resourceAllocator =
+      new DefaultResourceAllocator(
+        workflow.physicalPlan,
+        new ExecutionClusterInfo(),
+        workflow.context.workflowSettings
+      )
+
     val costEstimator = new DefaultCostEstimator(
       workflow.context,
+      resourceAllocator,
       CONTROLLER
     )
 
@@ -235,7 +254,7 @@ class DefaultCostEstimatorSpec
       ports = ports
     )
 
-    val costOfRegion = costEstimator.estimate(region, 1)
+    val (_, costOfRegion) = 
costEstimator.allocateResourcesAndEstimateCost(region, 1)
 
     assert(costOfRegion != 0)
   }
@@ -337,12 +356,20 @@ class DefaultCostEstimatorSpec
     val keywordRegion =
       searchResult.regionDAG.vertexSet().asScala.filter(region => 
region.physicalOps.size == 1).head
 
+    val resourceAllocator =
+      new DefaultResourceAllocator(
+        workflow.physicalPlan,
+        new ExecutionClusterInfo(),
+        workflow.context.workflowSettings
+      )
+
     val costEstimator = new DefaultCostEstimator(
       workflow.context,
+      resourceAllocator,
       CONTROLLER
     )
 
-    val groupByRegionCost = costEstimator.estimate(groupByRegion, 1)
+    val (_, groupByRegionCost) = 
costEstimator.allocateResourcesAndEstimateCost(groupByRegion, 1)
 
     val groupByOperatorCost = 
(groupByOpRuntimeStatistics.getField(6).asInstanceOf[Long] +
       groupByOpRuntimeStatistics.getField(7).asInstanceOf[Long]) / 1e9
@@ -352,7 +379,7 @@ class DefaultCostEstimatorSpec
     // The GroupBy operator has a longer running time.
     assert(groupByRegionCost == groupByOperatorCost)
 
-    val keywordRegionCost = costEstimator.estimate(keywordRegion, 1)
+    val (_, keywordRegionCost) = 
costEstimator.allocateResourcesAndEstimateCost(keywordRegion, 1)
 
     val keywordOperatorCost = 
(keywordOpRuntimeStatistics.getField(6).asInstanceOf[Long] +
       keywordOpRuntimeStatistics.getField(7).asInstanceOf[Long]) / 1e9

Reply via email to