This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5707-b7fe06e9c22a03b4ac5ce201e9b7eb244cbc20eb
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 226ebde6d372d2e55d0c3a988ffd765288b4faa5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 16 16:15:42 2026 -0700

    feat(scheduling): reuse output storage across region re-executions (#5707)
    
    ### What changes were proposed in this PR?
    
    Adds an opt-in mechanism for an output port to **reuse** its storage
    when the owning operator's region re-executes, instead of recreating the
    document each time. Dormant and behavior-preserving — no operator sets
    the flag in this PR.
    
    - `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside
    `blocking` / `mode`). It marks a port whose output accumulates across
    region re-executions — e.g. a Loop End port whose result builds up over
    the iterations of its own loop.
    - `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)`
    is the create-or-reuse decision: when reuse is requested and a document
    already exists it opens and returns that one; otherwise it creates a
    fresh one. It always returns the document, so the call site does not
    branch.
    - `RegionExecutionCoordinator` reads each output port's `reuseStorage`
    flag while provisioning that port's result/state documents and routes
    through `createOrReuseDocument`.
    
    | port flag | region re-run behavior |
    |---|---|
    | `false` (every operator today) | recreate output/state documents —
    unchanged |
    | `true` (set by Loop End in the loop PR) | keep and reopen the existing
    documents |
    
    A runtime guard in `RegionExecutionCoordinator` asserts no port sets
    `reuseStorage` for now: the flag activates only with the loop operators,
    which are not yet on `main`. The guard keeps the dormant reuse path from
    being silently exercised before its consumer exists, and is removed when
    the loop operators land.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of
    #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
    
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
    
    ### How was this PR tested?
    
    - `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse ×
    exists matrix plus the "no-reuse never probes existence" short-circuit)
    with injected document stubs, no iceberg backend.
    - `OutputPortReuseFlagSpec` — guards that no registered operator enables
    `reuseStorage` on any output port.
    - `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService`
    compile; scalafmt + scalafix clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../scheduling/RegionExecutionCoordinator.scala    | 29 ++++++-
 .../org/apache/texera/amber/core/workflow.proto    |  6 ++
 .../amber/core/storage/DocumentFactory.scala       | 22 +++++
 .../amber/core/storage/DocumentFactorySpec.scala   | 96 ++++++++++++++++++++++
 .../metadata/OutputPortReuseFlagSpec.scala         | 48 +++++++++++
 5 files changed, 199 insertions(+), 2 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 5a9df11b58..4497d7c4ae 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -576,8 +576,33 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(resultURI, schema)
-        DocumentFactory.createDocument(stateURI, State.schema)
+        // An output port whose storage accumulates across region re-executions
+        // (e.g. a LoopEnd port, whose output builds up over the iterations of
+        // its own loop) sets `reuseStorage`. When set, the port's existing
+        // document is kept and reopened on each re-run; when unset, a fresh 
one
+        // is created. Read per output port -- storage behavior is 
port-specific.
+        // (The inner LoopEnd of a nested loop additionally drops its output
+        // once per outer iteration on the Python worker side in
+        // MainLoop._process_state_frame, which is orthogonal to this.)
+        val reuseStorage =
+          region
+            .getOperator(outputPortId.opId)
+            .outputPorts(outputPortId.portId)
+            ._1
+            .reuseStorage
+        // Guard: no operator enables reuseStorage in production yet -- it
+        // activates with the loop operators, which aren't on main. Until then
+        // this fails loudly so the dormant reuse path is never silently
+        // exercised. Remove/relax this guard when introducing the loop 
operators.
+        require(
+          !reuseStorage,
+          s"Output port $outputPortId set reuseStorage, which is not " +
+            "supported in production yet (it activates with the loop 
operators)."
+        )
+        Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+          case (uri, sch) =>
+            DocumentFactory.createOrReuseDocument(uri, sch, reuseStorage)
+        }
         if (!isRestart) {
           val (_, eid, _, _) = decodeURI(resultURI)
           WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git 
a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
 
b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
index c5b2cb248f..b3b72ed924 100644
--- 
a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
+++ 
b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
@@ -62,6 +62,12 @@ message OutputPort {
   string displayName = 2;
   bool blocking = 3;
   OutputMode mode = 4;
+  // Whether storage at this port persists across the owning operator's region
+  // re-executions: when set, the existing document is kept and appended to on
+  // each re-run; when unset, it is recreated. Set e.g. on a LoopEnd port whose
+  // output accumulates across the iterations of its own loop. The region
+  // scheduler reads this when provisioning the port's output document.
+  bool reuseStorage = 5;
 }
 
 
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index d3fcae868f..a26340e79c 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -133,6 +133,28 @@ object DocumentFactory {
     }
   }
 
+  /**
+    * Return the document at `uri`: when `reuseExisting` is set and a document
+    * already exists there, open and return the existing one -- so a caller 
whose
+    * output accumulates across re-runs (e.g. a LoopEnd port whose region
+    * re-executes once per loop iteration) keeps the already-populated document
+    * instead of clobbering it, since `createDocument` overrides any existing
+    * document. Otherwise create it.
+    *
+    * `exists` / `open` / `create` default to this object's own 
`documentExists`
+    * / `openDocument` / `createDocument`; they are parameterized only so the
+    * create-or-reuse decision can be unit-tested without an iceberg backend.
+    */
+  def createOrReuseDocument(
+      uri: URI,
+      schema: Schema,
+      reuseExisting: Boolean,
+      exists: URI => Boolean = documentExists,
+      open: URI => VirtualDocument[_] = (u: URI) => openDocument(u)._1,
+      create: (URI, Schema) => VirtualDocument[_] = createDocument
+  ): VirtualDocument[_] =
+    if (reuseExisting && exists(uri)) open(uri) else create(uri, schema)
+
   /**
     * Open a document specified by the uri.
     * If the document is storing structural data, the schema will also be 
returned
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
new file mode 100644
index 0000000000..d67b90f363
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.storage
+
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.{Schema, Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.net.URI
+
+/**
+  * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse
+  * decision behind output-port storage provisioning. It always returns the
+  * document (opened when reused, created otherwise) so the call site doesn't
+  * branch.
+  *
+  * `exists` / `open` / `create` are injected so the decision can be pinned 
with
+  * trivial document stubs -- no iceberg backend, no live region.
+  */
+class DocumentFactorySpec extends AnyFlatSpec with Matchers {
+
+  private val uri = new URI("vfs:///wf/result/loop-end")
+  private val schema = Schema()
+
+  private def stubDoc: VirtualDocument[Tuple] =
+    new VirtualDocument[Tuple] {
+      override def getURI: URI = uri
+      override def clear(): Unit = ()
+    }
+  private val opened: VirtualDocument[_] = stubDoc
+  private val created: VirtualDocument[_] = stubDoc
+
+  /** Run with spies; return (document handed back, which path was taken). */
+  private def run(reuseExisting: Boolean, exists: Boolean): 
(VirtualDocument[_], String) = {
+    var path = ""
+    val doc = DocumentFactory.createOrReuseDocument(
+      uri,
+      schema,
+      reuseExisting,
+      _ => exists,
+      _ => { path = "open"; opened },
+      (_, _) => { path = "create"; created }
+    )
+    (doc, path)
+  }
+
+  "createOrReuseDocument" should
+    "open and return the existing document when the port reuses storage and 
one exists" in {
+    val (doc, path) = run(reuseExisting = true, exists = true)
+    path shouldBe "open"
+    doc should be theSameInstanceAs opened
+  }
+
+  it should "create when the port reuses storage but none exists yet" in {
+    val (doc, path) = run(reuseExisting = true, exists = false)
+    path shouldBe "create"
+    doc should be theSameInstanceAs created
+  }
+
+  it should "always create when the port does not reuse storage, even if one 
exists" in {
+    val (doc, path) = run(reuseExisting = false, exists = true)
+    path shouldBe "create"
+    doc should be theSameInstanceAs created
+  }
+
+  it should "not probe existence when the port does not reuse storage" in {
+    var probed = false
+    DocumentFactory.createOrReuseDocument(
+      uri,
+      schema,
+      reuseExisting = false,
+      _ => { probed = true; true },
+      _ => opened,
+      (_, _) => created
+    )
+    probed shouldBe false
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
new file mode 100644
index 0000000000..7850d2b98f
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+/**
+  * Guard for the `OutputPort.reuseStorage` flag.
+  *
+  * The flag tells the region scheduler to reuse (append to) a port's storage
+  * across region re-executions instead of recreating it. Only an operator 
whose
+  * output accumulates across re-executions should set it -- today that is no
+  * operator on `main` (the only one that will, Loop End, is not yet merged).
+  *
+  * This pins the flag off for every registered operator so it can't be turned
+  * on unexpectedly. When the loop operators land, update this to allow Loop
+  * End's output port (and only it).
+  */
+class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers {
+
+  "No registered operator" should "enable OutputPort.reuseStorage on any of 
its output ports" in {
+    OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass =>
+      opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach 
{ port =>
+        withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") {
+          port.reuseStorage shouldBe false
+        }
+      }
+    }
+  }
+}

Reply via email to