This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit bdaa01f8e22d2cdbc81c79f8c21326a97f036daf
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Dec 5 11:30:38 2025 -0800

    feat(cache): update cost model temporarily
---
 .../scheduling/CostBasedScheduleGenerator.scala    |   6 +-
 .../architecture/scheduling/CostEstimator.scala    | 118 ++++-----------------
 2 files changed, 23 insertions(+), 101 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index cb517d4125..5d9233c035 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -388,8 +388,8 @@ class CostBasedScheduleGenerator(
     */
   def bottomUpSearch(
       globalSearch: Boolean = false,
-      oChains: Boolean = true,
-      oCleanEdges: Boolean = true,
+      oChains: Boolean = false,
+      oCleanEdges: Boolean = false,
       oEarlyStop: Boolean = true
   ): SearchResult = {
     val startTime = System.nanoTime()
@@ -528,7 +528,7 @@ class CostBasedScheduleGenerator(
     */
   def topDownSearch(
       globalSearch: Boolean = false,
-      oChains: Boolean = true,
+      oChains: Boolean = false,
       oCleanEdges: Boolean = true
   ): SearchResult = {
     val startTime = System.nanoTime()
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala
index e14a9f7f6f..2fe09e815d 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -19,20 +19,17 @@
 
 package org.apache.amber.engine.architecture.scheduling
 
-import org.apache.amber.core.storage.DocumentFactory
-import org.apache.amber.core.tuple.Tuple
 import org.apache.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.amber.core.workflow.WorkflowContext
 import 
org.apache.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
-import org.apache.amber.engine.architecture.scheduling.config.ResourceConfig
+import org.apache.amber.engine.architecture.scheduling.config.{
+  InputPortConfig,
+  IntermediateInputPortConfig,
+  OutputPortConfig,
+  ResourceConfig
+}
 import 
org.apache.amber.engine.architecture.scheduling.resourcePolicies.ResourceAllocator
 import org.apache.amber.engine.common.AmberLogging
-import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.SqlServer.withTransaction
-import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, 
WORKFLOW_VERSION}
-
-import java.net.URI
-import scala.util.{Failure, Success, Try}
 
 /**
   * A cost estimator should estimate a cost of running a region under the 
given resource constraints as units.
@@ -65,101 +62,26 @@ class DefaultCostEstimator(
 ) extends CostEstimator
     with AmberLogging {
 
-  // Requires mysql database to retrieve execution statistics, otherwise use 
number of materialized ports as a default.
-  private val operatorEstimatedTimeOption = Try(
-    this.getOperatorExecutionTimeInSeconds(
-      this.workflowContext.workflowId.id
-    )
-  ) match {
-    case Failure(_)      => None
-    case Success(result) => result
-  }
-
-  operatorEstimatedTimeOption match {
-    case None =>
-      logger.info(
-        s"WID: ${workflowContext.workflowId.id}, EID: 
${workflowContext.executionId.id}, " +
-          s"no past execution statistics available. Using number of 
materialized output ports as the cost. "
-      )
-    case Some(_) =>
-  }
-
   override def allocateResourcesAndEstimateCost(
       region: Region,
       resourceUnits: Int
   ): (ResourceConfig, Double) = {
     // Currently the dummy cost from resourceAllocator is discarded.
     val (resourceConfig, _) = resourceAllocator.allocate(region)
-    // We use a cost model that does not rely on the resource allocation.
-    // TODO: Once the ResourceAllocator actually calculates a cost, we can use 
its calculated cost.
-    val cost = this.operatorEstimatedTimeOption match {
-      case Some(operatorEstimatedTime) =>
-        // Use past statistics (wall-clock runtime). We use the execution time 
of the longest-running
-        // operator in each region to represent the region's execution time, 
and use the sum of all the regions'
-        // execution time as the wall-clock runtime of the workflow.
-        // This assumes a schedule is a total-order of the regions.
-        val opExecutionTimes = region.getOperators.map(op => {
-          operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, 
DEFAULT_OPERATOR_COST)
-        })
-        val longestRunningOpExecutionTime = opExecutionTimes.max
-        longestRunningOpExecutionTime
-      case None =>
-        // Without past statistics (e.g., first execution), we use number of 
ports needing storage as the cost.
-        // Each port needing storage has a portConfig.
-        // This is independent of the schedule / resource allocator.
-        resourceConfig.portConfigs.size
-    }
+    val cost =
+      if (region.cached) {
+        0.0
+      } else {
+        val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST
+        val writeCost = resourceConfig.portConfigs.values.collect { case _: 
OutputPortConfig =>
+          0.5
+        }.sum
+        val readCost = resourceConfig.portConfigs.values.collect {
+          case _: InputPortConfig              => 0.5
+          case _: IntermediateInputPortConfig => 0.5
+        }.sum
+        opCost + writeCost + readCost
+      }
     (resourceConfig, cost)
   }
-
-  /**
-    * Retrieve the latest successful execution to get statistics to calculate 
costs in DefaultCostEstimator.
-    * Using the total control processing time plus data processing time of an 
operator as its cost.
-    * If no past statistics are available (e.g., first execution), return None.
-    */
-  private def getOperatorExecutionTimeInSeconds(
-      wid: Long
-  ): Option[Map[String, Double]] = {
-
-    val uriString: String = withTransaction(
-      SqlServer
-        .getInstance()
-        .createDSLContext()
-    ) { context =>
-      context
-        .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
-        .from(WORKFLOW_EXECUTIONS)
-        .join(WORKFLOW_VERSION)
-        .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
-        .where(
-          WORKFLOW_VERSION.WID
-            .eq(wid.toInt)
-            .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
-        )
-        .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
-        .limit(1)
-        .fetchOneInto(classOf[String])
-    }
-
-    if (uriString == null || uriString.isEmpty) {
-      None
-    } else {
-      val uri: URI = new URI(uriString)
-      val document = DocumentFactory.openDocument(uri)
-
-      val maxStats = document._1
-        .get()
-        .foldLeft(Map.empty[String, Double]) { (acc, tuple) =>
-          val record = tuple.asInstanceOf[Tuple]
-          val operatorId = record.getField(0).asInstanceOf[String]
-          val dataProcessingTime = record.getField(6).asInstanceOf[Long]
-          val controlProcessingTime = record.getField(7).asInstanceOf[Long]
-          val currentMaxTime = acc.getOrElse(operatorId, 0.0)
-          val newTime = (dataProcessingTime + controlProcessingTime) / 1e9
-          acc + (operatorId -> Math.max(currentMaxTime, newTime))
-        }
-
-      if (maxStats.isEmpty) None else Some(maxStats)
-    }
-  }
 }

Reply via email to