This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 60d3b2e6db refactor(loop): hoist shared test stubs and fold 
provisionOutputDocument calls
60d3b2e6db is described below

commit 60d3b2e6dbf2f47189ed2c4526a04dffc20938c9
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 16:40:07 2026 -0700

    refactor(loop): hoist shared test stubs and fold provisionOutputDocument 
calls
    
    More line-count cleanup (the deferred items from the prior review; no 
behavior
    change):
    
    - test_main_loop: hoist the inline `StubLoopEnd(condition -> False)` 
(repeated
      across 4 tests) to a module-level `_FalseLoopEnd`, and the duplicated
      `_Reader`/`_R` materialization-reader stub (3 copies) to a module-level
      `_MatReader`.
    - test_loop_operators: extract `_ipc_one_row()` for the one-row Arrow-IPC 
table
      literal built inline in 4 LoopEnd tests.
    - RegionExecutionCoordinator.createOutputPortStorageObjects: provision the
      result and state documents in one `Seq(...).foreach` instead of two
      near-identical `provisionOutputDocument` calls.
    
    Verified: test_loop_operators + test_main_loop + test_output_manager (51) 
pass,
    ruff clean; amber Test/compile + scalafmt clean.
---
 .../scheduling/RegionExecutionCoordinator.scala    | 21 ++++----
 .../test/python/core/models/test_loop_operators.py | 13 +++--
 .../test/python/core/runnables/test_main_loop.py   | 61 +++++++++-------------
 3 files changed, 43 insertions(+), 52 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4c9086c9b6..9d43f4e18b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -619,18 +619,15 @@ class RegionExecutionCoordinator(
         // MainLoop._process_state_frame -- which is orthogonal to this
         // region-provisioning reuse.)
         val reusesOutputStorage = 
region.getOperators.exists(_.reusesOutputStorageOnReExecution)
-        RegionExecutionCoordinator.provisionOutputDocument(
-          resultURI,
-          reusesOutputStorage,
-          DocumentFactory.documentExists,
-          uri => DocumentFactory.createDocument(uri, schema)
-        )
-        RegionExecutionCoordinator.provisionOutputDocument(
-          stateURI,
-          reusesOutputStorage,
-          DocumentFactory.documentExists,
-          uri => DocumentFactory.createDocument(uri, State.schema)
-        )
+        Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+          case (uri, sch) =>
+            RegionExecutionCoordinator.provisionOutputDocument(
+              uri,
+              reusesOutputStorage,
+              DocumentFactory.documentExists,
+              u => DocumentFactory.createDocument(u, sch)
+            )
+        }
         if (!isRestart) {
           val (_, eid, _, _) = decodeURI(resultURI)
           WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git a/amber/src/test/python/core/models/test_loop_operators.py 
b/amber/src/test/python/core/models/test_loop_operators.py
index c2d52e6064..b45fd2e377 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -113,6 +113,11 @@ class _StubLoopEnd(LoopEndOperator):
 # ---------------------------------------------------------------------------
 
 
+def _ipc_one_row():
+    """One-row table as Arrow IPC bytes (the loop `table` payload)."""
+    return table_to_ipc_bytes(Table([Tuple({"v": 1})]))
+
+
 class TestLoopStartProcessState:
     def 
test_first_time_state_is_merged_into_self_state_and_none_is_returned(self):
         # First entry: state from upstream (no LoopStartStateURI). The
@@ -254,7 +259,7 @@ class TestLoopEndBase:
         op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
         assert op._loop_table is None
         op.process_state(
-            State({"i": 0, "table": table_to_ipc_bytes(Table([Tuple({"v": 
1})]))}),
+            State({"i": 0, "table": _ipc_one_row()}),
             port=0,
         )
         assert op._loop_table is not None
@@ -281,7 +286,7 @@ class TestLoopEndMatchingBranch:
         incoming = State(
             {
                 "i": 2,
-                "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+                "table": _ipc_one_row(),
             }
         )
 
@@ -303,7 +308,7 @@ class TestLoopEndMatchingBranch:
             State(
                 {
                     "i": 1,
-                    "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+                    "table": _ipc_one_row(),
                 }
             ),
             port=0,
@@ -315,7 +320,7 @@ class TestLoopEndMatchingBranch:
             State(
                 {
                     "i": 2,
-                    "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+                    "table": _ipc_one_row(),
                 }
             ),
             port=0,
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py 
b/amber/src/test/python/core/runnables/test_main_loop.py
index 6df9f96055..3c62290881 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -88,6 +88,18 @@ from pytexera.udf.examples.count_batch_operator import 
CountBatchOperator
 from pytexera.udf.examples.echo_operator import EchoOperator
 
 
+class _FalseLoopEnd(LoopEndOperator):
+    def condition(self):
+        return False
+
+
+class _MatReader:
+    """Minimal input-port materialization reader stub carrying a uri."""
+
+    def __init__(self, uri):
+        self.uri = uri
+
+
 class TestMainLoop:
     @pytest.fixture
     def command_sequence(self):
@@ -1927,11 +1939,7 @@ class TestMainLoop:
         # loop: the runtime decrements and forwards, skipping the operator. It
         # also resets this (inner) LoopEnd's output storage -- the outer loop
         # advancing is the signal to drop the previous outer iteration's rows.
-        class StubLoopEnd(LoopEndOperator):
-            def condition(self):
-                return False
-
-        main_loop.context.executor_manager.executor = StubLoopEnd()
+        main_loop.context.executor_manager.executor = _FalseLoopEnd()
         emitted, switched, reset_calls = self._capture_state_emit(
             main_loop, monkeypatch
         )
@@ -1960,11 +1968,7 @@ class TestMainLoop:
         # loop_counter == 0 is the matching loop: the runtime runs the operator
         # (consume) via the context switch. The operator returns None, so no
         # state is emitted; the loop-back is driven by complete() separately.
-        class StubLoopEnd(LoopEndOperator):
-            def condition(self):
-                return False
-
-        main_loop.context.executor_manager.executor = StubLoopEnd()
+        main_loop.context.executor_manager.executor = _FalseLoopEnd()
         emitted, switched, reset_calls = self._capture_state_emit(
             main_loop, monkeypatch
         )
@@ -1986,11 +1990,7 @@ class TestMainLoop:
         # the previous outer iteration's rows are dropped before this outer
         # iteration's inner results accumulate. It must NOT invoke the 
operator,
         # and forwards the outer state one level out (loop_counter - 1).
-        class StubLoopEnd(LoopEndOperator):
-            def condition(self):
-                return False
-
-        main_loop.context.executor_manager.executor = StubLoopEnd()
+        main_loop.context.executor_manager.executor = _FalseLoopEnd()
         emitted, switched, reset_calls = self._capture_state_emit(
             main_loop, monkeypatch
         )
@@ -2023,11 +2023,7 @@ class TestMainLoop:
         # metadata must yield a user-facing `current_input_state` that
         # contains only the inner State's keys -- never the envelope
         # names.
-        class StubLoopEnd(LoopEndOperator):
-            def condition(self):
-                return False
-
-        main_loop.context.executor_manager.executor = StubLoopEnd()
+        main_loop.context.executor_manager.executor = _FalseLoopEnd()
         # Standard stubs: emit/save/switch don't fire real work. The
         # consume branch sets `current_input_state` to the inner State
         # BEFORE the (stubbed) context switch, so the assertion below
@@ -2083,11 +2079,7 @@ class TestMainLoop:
         `input_manager.get_input_port_mat_reader_threads` returns: one
         port -> one reader runnable carrying the given URI."""
 
-        class _Reader:
-            def __init__(self, u):
-                self.uri = u
-
-        return {PortIdentity(0, internal=False): [_Reader(uri)]}
+        return {PortIdentity(0, internal=False): [_MatReader(uri)]}
 
     def test_compute_loop_start_id_parses_worker_id_via_canonical_helper(
         self, main_loop, monkeypatch
@@ -2139,13 +2131,9 @@ class TestMainLoop:
         # surfaces the misconfiguration loudly instead.
         main_loop.context.worker_id = "Worker:WF1-LoopStart-op-abc-main-0"
 
-        class _R:
-            def __init__(self, u):
-                self.uri = u
-
         two_ports = {
-            PortIdentity(0, internal=False): [_R("vfs:///a")],
-            PortIdentity(1, internal=False): [_R("vfs:///b")],
+            PortIdentity(0, internal=False): [_MatReader("vfs:///a")],
+            PortIdentity(1, internal=False): [_MatReader("vfs:///b")],
         }
         monkeypatch.setattr(
             main_loop.context.input_manager,
@@ -2163,14 +2151,15 @@ class TestMainLoop:
         # reader runnable would have silently picked the first.
         main_loop.context.worker_id = "Worker:WF1-LoopStart-op-abc-main-0"
 
-        class _R:
-            def __init__(self, u):
-                self.uri = u
-
         monkeypatch.setattr(
             main_loop.context.input_manager,
             "get_input_port_mat_reader_threads",
-            lambda: {PortIdentity(0, internal=False): [_R("vfs:///a"), 
_R("vfs:///b")]},
+            lambda: {
+                PortIdentity(0, internal=False): [
+                    _MatReader("vfs:///a"),
+                    _MatReader("vfs:///b"),
+                ]
+            },
         )
 
         with pytest.raises(RuntimeError, match="exactly one input reader"):

Reply via email to