This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5707-b2d216a2ad728454d354a45de4ea5de1e391f81f in repository https://gitbox.apache.org/repos/asf/texera.git
commit da99a359458bb285908752779b4c2dc3f11ecd89 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Jun 16 17:11:35 2026 -0700 feat(scheduling): reuse output storage across region re-executions (#5707) ### What changes were proposed in this PR? Adds an opt-in mechanism for an output port to **reuse** its storage when the owning operator's region re-executes, instead of recreating the document each time. Dormant and behavior-preserving — no operator sets the flag in this PR. - `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside `blocking` / `mode`). It marks a port whose output accumulates across region re-executions — e.g. a Loop End port whose result builds up over the iterations of its own loop. - `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)` is the create-or-reuse decision: when reuse is requested and a document already exists it opens and returns that one; otherwise it creates a fresh one. It always returns the document, so the call site does not branch. - `RegionExecutionCoordinator` reads each output port's `reuseStorage` flag while provisioning that port's result/state documents and routes through `createOrReuseDocument`. | port flag | region re-run behavior | |---|---| | `false` (every operator today) | recreate output/state documents — unchanged | | `true` (set by Loop End in the loop PR) | keep and reopen the existing documents | A runtime guard in `RegionExecutionCoordinator` asserts no port sets `reuseStorage` for now: the flag activates only with the loop operators, which are not yet on `main`. The guard keeps the dormant reuse path from being silently exercised before its consumer exists, and is removed when the loop operators land. ### Any related issues, documentation, discussions? Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715). ### How was this PR tested? - `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse × exists matrix plus the "no-reuse never probes existence" short-circuit) with injected document stubs, no iceberg backend. - `OutputPortReuseFlagSpec` — guards that no registered operator enables `reuseStorage` on any output port. - `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService` compile; scalafmt + scalafix clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../scheduling/RegionExecutionCoordinator.scala | 29 ++++++- .../org/apache/texera/amber/core/workflow.proto | 6 ++ .../amber/core/storage/DocumentFactory.scala | 22 +++++ .../amber/core/storage/DocumentFactorySpec.scala | 96 ++++++++++++++++++++++ .../metadata/OutputPortReuseFlagSpec.scala | 48 +++++++++++ 5 files changed, 199 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b58..4497d7c4ae 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -576,8 +576,33 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(resultURI, schema) - DocumentFactory.createDocument(stateURI, State.schema) + // An output port whose storage accumulates across region re-executions + // (e.g. a LoopEnd port, whose output builds up over the iterations of + // its own loop) sets `reuseStorage`. When set, the port's existing + // document is kept and reopened on each re-run; when unset, a fresh one + // is created. Read per output port -- storage behavior is port-specific. + // (The inner LoopEnd of a nested loop additionally drops its output + // once per outer iteration on the Python worker side in + // MainLoop._process_state_frame, which is orthogonal to this.) + val reuseStorage = + region + .getOperator(outputPortId.opId) + .outputPorts(outputPortId.portId) + ._1 + .reuseStorage + // Guard: no operator enables reuseStorage in production yet -- it + // activates with the loop operators, which aren't on main. Until then + // this fails loudly so the dormant reuse path is never silently + // exercised. Remove/relax this guard when introducing the loop operators. + require( + !reuseStorage, + s"Output port $outputPortId set reuseStorage, which is not " + + "supported in production yet (it activates with the loop operators)." + ) + Seq((resultURI, schema), (stateURI, State.schema)).foreach { + case (uri, sch) => + DocumentFactory.createOrReuseDocument(uri, sch, reuseStorage) + } if (!isRestart) { val (_, eid, _, _) = decodeURI(resultURI) WorkflowExecutionsResource.insertOperatorPortResultUri( diff --git a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto index c5b2cb248f..b3b72ed924 100644 --- a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto +++ b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto @@ -62,6 +62,12 @@ message OutputPort { string displayName = 2; bool blocking = 3; OutputMode mode = 4; + // Whether storage at this port persists across the owning operator's region + // re-executions: when set, the existing document is kept and appended to on + // each re-run; when unset, it is recreated. Set e.g. on a LoopEnd port whose + // output accumulates across the iterations of its own loop. The region + // scheduler reads this when provisioning the port's output document. + bool reuseStorage = 5; } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index d3fcae868f..a26340e79c 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -133,6 +133,28 @@ object DocumentFactory { } } + /** + * Return the document at `uri`: when `reuseExisting` is set and a document + * already exists there, open and return the existing one -- so a caller whose + * output accumulates across re-runs (e.g. a LoopEnd port whose region + * re-executes once per loop iteration) keeps the already-populated document + * instead of clobbering it, since `createDocument` overrides any existing + * document. Otherwise create it. + * + * `exists` / `open` / `create` default to this object's own `documentExists` + * / `openDocument` / `createDocument`; they are parameterized only so the + * create-or-reuse decision can be unit-tested without an iceberg backend. + */ + def createOrReuseDocument( + uri: URI, + schema: Schema, + reuseExisting: Boolean, + exists: URI => Boolean = documentExists, + open: URI => VirtualDocument[_] = (u: URI) => openDocument(u)._1, + create: (URI, Schema) => VirtualDocument[_] = createDocument + ): VirtualDocument[_] = + if (reuseExisting && exists(uri)) open(uri) else create(uri, schema) + /** * Open a document specified by the uri. * If the document is storing structural data, the schema will also be returned diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala new file mode 100644 index 0000000000..d67b90f363 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage + +import org.apache.texera.amber.core.storage.model.VirtualDocument +import org.apache.texera.amber.core.tuple.{Schema, Tuple} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.net.URI + +/** + * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse + * decision behind output-port storage provisioning. It always returns the + * document (opened when reused, created otherwise) so the call site doesn't + * branch. + * + * `exists` / `open` / `create` are injected so the decision can be pinned with + * trivial document stubs -- no iceberg backend, no live region. + */ +class DocumentFactorySpec extends AnyFlatSpec with Matchers { + + private val uri = new URI("vfs:///wf/result/loop-end") + private val schema = Schema() + + private def stubDoc: VirtualDocument[Tuple] = + new VirtualDocument[Tuple] { + override def getURI: URI = uri + override def clear(): Unit = () + } + private val opened: VirtualDocument[_] = stubDoc + private val created: VirtualDocument[_] = stubDoc + + /** Run with spies; return (document handed back, which path was taken). */ + private def run(reuseExisting: Boolean, exists: Boolean): (VirtualDocument[_], String) = { + var path = "" + val doc = DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting, + _ => exists, + _ => { path = "open"; opened }, + (_, _) => { path = "create"; created } + ) + (doc, path) + } + + "createOrReuseDocument" should + "open and return the existing document when the port reuses storage and one exists" in { + val (doc, path) = run(reuseExisting = true, exists = true) + path shouldBe "open" + doc should be theSameInstanceAs opened + } + + it should "create when the port reuses storage but none exists yet" in { + val (doc, path) = run(reuseExisting = true, exists = false) + path shouldBe "create" + doc should be theSameInstanceAs created + } + + it should "always create when the port does not reuse storage, even if one exists" in { + val (doc, path) = run(reuseExisting = false, exists = true) + path shouldBe "create" + doc should be theSameInstanceAs created + } + + it should "not probe existence when the port does not reuse storage" in { + var probed = false + DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting = false, + _ => { probed = true; true }, + _ => opened, + (_, _) => created + ) + probed shouldBe false + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala new file mode 100644 index 0000000000..7850d2b98f --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.metadata + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +/** + * Guard for the `OutputPort.reuseStorage` flag. + * + * The flag tells the region scheduler to reuse (append to) a port's storage + * across region re-executions instead of recreating it. Only an operator whose + * output accumulates across re-executions should set it -- today that is no + * operator on `main` (the only one that will, Loop End, is not yet merged). + * + * This pins the flag off for every registered operator so it can't be turned + * on unexpectedly. When the loop operators land, update this to allow Loop + * End's output port (and only it). + */ +class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers { + + "No registered operator" should "enable OutputPort.reuseStorage on any of its output ports" in { + OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass => + opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach { port => + withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") { + port.reuseStorage shouldBe false + } + } + } + } +}
