This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2e0dd7ca45 feat(execution): add
PhysicalOp.requiresMaterializedExecution, consumed by the scheduler (#5720)
2e0dd7ca45 is described below
commit 2e0dd7ca45e58ae7a036591cd04aa5b156b55103
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 21 23:13:09 2026 -0700
feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by
the scheduler (#5720)
### What changes were proposed in this PR?
Lets an operator declare it can only run under a fully-materialized
schedule, and has the scheduler honor it:
- `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+
a `withRequiresMaterializedExecution` builder). It is a
physical-execution property, so it lives on the physical op.
- `CostBasedScheduleGenerator` consumes it: when any physical op
requires materialized execution it forces a fully-materialized schedule
regardless of the requested execution mode; otherwise the existing
PIPELINED/MATERIALIZED logic runs unchanged.
Default `false` ⇒ dormant and behavior-preserving: no operator requires
it yet, so the scheduler's effective mode is unchanged today. The loop
operators set the flag on their physical op.
### Any related issues, documentation, discussions?
Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of
#5700. Reflects the review discussion with @Yicong-Huang: the property
belongs on `PhysicalOp`, and it is consumed by the scheduler.
### How was this PR tested?
`WorkflowCoreTypesSpec` covers the
`PhysicalOp.requiresMaterializedExecution` default + builder.
`WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and
`scalafmtCheckAll` pass locally. The scheduler consumer is exercised
end-to-end by the loop integration tests once the loop operators (which
set the flag) land.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.
---
.../scheduling/CostBasedScheduleGenerator.scala | 23 +++++++++++++-
.../CostBasedScheduleGeneratorSpec.scala | 37 ++++++++++++++++++++++
.../texera/amber/core/workflow/PhysicalOp.scala | 17 ++++++++++
.../core/workflow/WorkflowCoreTypesSpec.scala | 8 +++++
4 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 44958718b2..85de480210 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -44,6 +44,23 @@ import scala.jdk.CollectionConverters._
import scala.util.control.Breaks.{break, breakable}
import scala.util.{Failure, Success, Try}
+object CostBasedScheduleGenerator {
+
+ /**
+ * The execution mode to schedule under: MATERIALIZED when any operator in
+ * `physicalPlan` requires it (e.g. the loop operators, whose back-edge is a
+ * cross-region materialized state channel), otherwise the requested mode.
+ */
+ private[scheduling] def effectiveExecutionMode(
+ physicalPlan: PhysicalPlan,
+ requestedMode: ExecutionMode
+ ): ExecutionMode =
+ if (physicalPlan.operators.exists(_.requiresMaterializedExecution))
+ ExecutionMode.MATERIALIZED
+ else
+ requestedMode
+}
+
class CostBasedScheduleGenerator(
workflowContext: WorkflowContext,
initialPhysicalPlan: PhysicalPlan,
@@ -304,7 +321,11 @@ class CostBasedScheduleGenerator(
*/
private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = {
val searchResultFuture: Future[SearchResult] = Future {
- workflowContext.workflowSettings.executionMode match {
+ val effectiveMode = CostBasedScheduleGenerator.effectiveExecutionMode(
+ physicalPlan,
+ workflowContext.workflowSettings.executionMode
+ )
+ effectiveMode match {
case ExecutionMode.MATERIALIZED =>
getFullyMaterializedSearchState
case ExecutionMode.PIPELINED =>
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
index 7d5227c36b..bff2917daa 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala
@@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.texera.amber.core.workflow.{
ExecutionMode,
+ PhysicalPlan,
PortIdentity,
WorkflowContext,
WorkflowSettings
@@ -515,4 +516,40 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec
with MockFactory {
)
}
+ "CostBasedScheduleGenerator.effectiveExecutionMode" should
+ "force MATERIALIZED when an operator requires it, even if PIPELINED is
requested" in {
+ val workflow = buildWorkflow(
+ List(TestOperators.headerlessSmallCsvScanOpDesc()),
+ List(),
+ new WorkflowContext()
+ )
+ val planRequiringMaterialization = PhysicalPlan(
+
workflow.physicalPlan.operators.map(_.withRequiresMaterializedExecution(true)),
+ workflow.physicalPlan.links
+ )
+ assert(
+ CostBasedScheduleGenerator.effectiveExecutionMode(
+ planRequiringMaterialization,
+ ExecutionMode.PIPELINED
+ ) == ExecutionMode.MATERIALIZED
+ )
+ }
+
+ it should "keep the requested mode when no operator requires
materialization" in {
+ val workflow = buildWorkflow(
+ List(TestOperators.headerlessSmallCsvScanOpDesc()),
+ List(),
+ new WorkflowContext()
+ )
+ val plan = workflow.physicalPlan
+ assert(
+ CostBasedScheduleGenerator.effectiveExecutionMode(plan,
ExecutionMode.PIPELINED) ==
+ ExecutionMode.PIPELINED
+ )
+ assert(
+ CostBasedScheduleGenerator.effectiveExecutionMode(plan,
ExecutionMode.MATERIALIZED) ==
+ ExecutionMode.MATERIALIZED
+ )
+ }
+
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
index 44125045c9..1fdda49a6a 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
@@ -198,6 +198,16 @@ case class PhysicalOp(
// schema propagation function
propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas =>
schemas),
isOneToManyOp: Boolean = false,
+ // Whether this operator can only run correctly under a fully-materialized
+ // schedule (e.g. a loop operator, whose back-edge is a cross-region
+ // materialized state channel that requires region-based re-execution).
+ // When ANY operator in the plan sets this, the schedule generator runs the
+ // WHOLE workflow fully materialized -- every link materialized, nothing
+ // pipelined -- not just this operator's own region boundaries. Whole-plan
+ // materialization is the minimal correct behavior for loops today;
+ // restricting it to only the requiring operator's regions is a possible
+ // future optimization. Default false.
+ requiresMaterializedExecution: Boolean = false,
// hint for number of workers
suggestedWorkerNum: Option[Int] = None,
// name of the PVE to execute within
@@ -316,6 +326,13 @@ case class PhysicalOp(
def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp =
this.copy(isOneToManyOp = isOneToManyOp)
+ /**
+ * creates a copy specifying whether this operator can only run correctly
+ * under a fully-materialized schedule (see the field doc)
+ */
+ def withRequiresMaterializedExecution(requiresMaterializedExecution:
Boolean): PhysicalOp =
+ this.copy(requiresMaterializedExecution = requiresMaterializedExecution)
+
/**
* Creates a copy of the PhysicalOp with the schema of a specified input
port updated.
* The schema can either be a successful schema definition or an error
represented as a Throwable.
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
index 11f73013bf..2bf47489a3 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
@@ -142,6 +142,14 @@ class WorkflowCoreTypesSpec extends AnyFlatSpec {
assert(op.parallelizable, "the original instance is immutable")
}
+ "PhysicalOp.withRequiresMaterializedExecution" should "default to false and
round-trip through copy" in {
+ val op = newPhysicalOp("a")
+ assert(!op.requiresMaterializedExecution, "defaults to false")
+ val flipped = op.withRequiresMaterializedExecution(true)
+ assert(flipped.requiresMaterializedExecution)
+ assert(!op.requiresMaterializedExecution, "the original instance is
immutable")
+ }
+
"PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count"
in {
val op = newPhysicalOp("a").withSuggestedWorkerNum(7)
assert(op.suggestedWorkerNum.contains(7))