This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5720-44df4f77013208248c0e5e59d7d7b505ec04ad56
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 2e0dd7ca45e58ae7a036591cd04aa5b156b55103
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 21 23:13:09 2026 -0700

    feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by 
the scheduler (#5720)
    
    ### What changes were proposed in this PR?
    
    Lets an operator declare it can only run under a fully-materialized
    schedule, and has the scheduler honor it:
    
    - `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+
    a `withRequiresMaterializedExecution` builder). It is a
    physical-execution property, so it lives on the physical op.
    - `CostBasedScheduleGenerator` consumes it: when any physical op
    requires materialized execution it forces a fully-materialized schedule
    regardless of the requested execution mode; otherwise the existing
    PIPELINED/MATERIALIZED logic runs unchanged.
    
    Default `false` ⇒ dormant and behavior-preserving: no operator requires
    it yet, so the scheduler's effective mode is unchanged today. The loop
    operators set the flag on their physical op.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of
    #5700. Reflects the review discussion with @Yicong-Huang: the property
    belongs on `PhysicalOp`, and it is consumed by the scheduler.
    
    ### How was this PR tested?
    
    `WorkflowCoreTypesSpec` covers the
    `PhysicalOp.requiresMaterializedExecution` default + builder.
    `WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and
    `scalafmtCheckAll` pass locally. The scheduler consumer is exercised
    end-to-end by the loop integration tests once the loop operators (which
    set the flag) land.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../scheduling/CostBasedScheduleGenerator.scala    | 23 +++++++++++++-
 .../CostBasedScheduleGeneratorSpec.scala           | 37 ++++++++++++++++++++++
 .../texera/amber/core/workflow/PhysicalOp.scala    | 17 ++++++++++
 .../core/workflow/WorkflowCoreTypesSpec.scala      |  8 +++++
 4 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 44958718b2..85de480210 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -44,6 +44,23 @@ import scala.jdk.CollectionConverters._
 import scala.util.control.Breaks.{break, breakable}
 import scala.util.{Failure, Success, Try}
 
+object CostBasedScheduleGenerator {
+
+  /**
+    * The execution mode to schedule under: MATERIALIZED when any operator in
+    * `physicalPlan` requires it (e.g. the loop operators, whose back-edge is a
+    * cross-region materialized state channel), otherwise the requested mode.
+    */
+  private[scheduling] def effectiveExecutionMode(
+      physicalPlan: PhysicalPlan,
+      requestedMode: ExecutionMode
+  ): ExecutionMode =
+    if (physicalPlan.operators.exists(_.requiresMaterializedExecution))
+      ExecutionMode.MATERIALIZED
+    else
+      requestedMode
+}
+
 class CostBasedScheduleGenerator(
     workflowContext: WorkflowContext,
     initialPhysicalPlan: PhysicalPlan,
@@ -304,7 +321,11 @@ class CostBasedScheduleGenerator(
     */
   private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
     val searchResultFuture: Future[SearchResult] = Future {
-      workflowContext.workflowSettings.executionMode match {
+      val effectiveMode = CostBasedScheduleGenerator.effectiveExecutionMode(
+        physicalPlan,
+        workflowContext.workflowSettings.executionMode
+      )
+      effectiveMode match {
         case ExecutionMode.MATERIALIZED =>
           getFullyMaterializedSearchState
         case ExecutionMode.PIPELINED =>
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
index 7d5227c36b..bff2917daa 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
@@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.texera.amber.core.workflow.{
   ExecutionMode,
+  PhysicalPlan,
   PortIdentity,
   WorkflowContext,
   WorkflowSettings
@@ -515,4 +516,40 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec 
with MockFactory {
     )
   }
 
+  "CostBasedScheduleGenerator.effectiveExecutionMode" should
+    "force MATERIALIZED when an operator requires it, even if PIPELINED is 
requested" in {
+    val workflow = buildWorkflow(
+      List(TestOperators.headerlessSmallCsvScanOpDesc()),
+      List(),
+      new WorkflowContext()
+    )
+    val planRequiringMaterialization = PhysicalPlan(
+      
workflow.physicalPlan.operators.map(_.withRequiresMaterializedExecution(true)),
+      workflow.physicalPlan.links
+    )
+    assert(
+      CostBasedScheduleGenerator.effectiveExecutionMode(
+        planRequiringMaterialization,
+        ExecutionMode.PIPELINED
+      ) == ExecutionMode.MATERIALIZED
+    )
+  }
+
+  it should "keep the requested mode when no operator requires 
materialization" in {
+    val workflow = buildWorkflow(
+      List(TestOperators.headerlessSmallCsvScanOpDesc()),
+      List(),
+      new WorkflowContext()
+    )
+    val plan = workflow.physicalPlan
+    assert(
+      CostBasedScheduleGenerator.effectiveExecutionMode(plan, 
ExecutionMode.PIPELINED) ==
+        ExecutionMode.PIPELINED
+    )
+    assert(
+      CostBasedScheduleGenerator.effectiveExecutionMode(plan, 
ExecutionMode.MATERIALIZED) ==
+        ExecutionMode.MATERIALIZED
+    )
+  }
+
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
index 44125045c9..1fdda49a6a 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
@@ -198,6 +198,16 @@ case class PhysicalOp(
     // schema propagation function
     propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => 
schemas),
     isOneToManyOp: Boolean = false,
+    // Whether this operator can only run correctly under a fully-materialized
+    // schedule (e.g. a loop operator, whose back-edge is a cross-region
+    // materialized state channel that requires region-based re-execution).
+    // When ANY operator in the plan sets this, the schedule generator runs the
+    // WHOLE workflow fully materialized -- every link materialized, nothing
+    // pipelined -- not just this operator's own region boundaries. Whole-plan
+    // materialization is the minimal correct behavior for loops today;
+    // restricting it to only the requiring operator's regions is a possible
+    // future optimization. Default false.
+    requiresMaterializedExecution: Boolean = false,
     // hint for number of workers
     suggestedWorkerNum: Option[Int] = None,
     // name of the PVE to execute within
@@ -316,6 +326,13 @@ case class PhysicalOp(
   def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp =
     this.copy(isOneToManyOp = isOneToManyOp)
 
+  /**
+    * creates a copy specifying whether this operator can only run correctly
+    * under a fully-materialized schedule (see the field doc)
+    */
+  def withRequiresMaterializedExecution(requiresMaterializedExecution: 
Boolean): PhysicalOp =
+    this.copy(requiresMaterializedExecution = requiresMaterializedExecution)
+
   /**
     * Creates a copy of the PhysicalOp with the schema of a specified input 
port updated.
     * The schema can either be a successful schema definition or an error 
represented as a Throwable.
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
index 11f73013bf..2bf47489a3 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
@@ -142,6 +142,14 @@ class WorkflowCoreTypesSpec extends AnyFlatSpec {
     assert(op.parallelizable, "the original instance is immutable")
   }
 
+  "PhysicalOp.withRequiresMaterializedExecution" should "default to false and 
round-trip through copy" in {
+    val op = newPhysicalOp("a")
+    assert(!op.requiresMaterializedExecution, "defaults to false")
+    val flipped = op.withRequiresMaterializedExecution(true)
+    assert(flipped.requiresMaterializedExecution)
+    assert(!op.requiresMaterializedExecution, "the original instance is 
immutable")
+  }
+
   "PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count" 
in {
     val op = newPhysicalOp("a").withSuggestedWorkerNum(7)
     assert(op.suggestedWorkerNum.contains(7))

Reply via email to