This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 60d3b2e6db refactor(loop): hoist shared test stubs and fold
provisionOutputDocument calls
60d3b2e6db is described below
commit 60d3b2e6dbf2f47189ed2c4526a04dffc20938c9
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 16:40:07 2026 -0700
refactor(loop): hoist shared test stubs and fold provisionOutputDocument
calls
More line-count cleanup (the deferred items from the prior review; no
behavior
change):
- test_main_loop: hoist the inline `StubLoopEnd(condition -> False)`
(repeated
across 4 tests) to a module-level `_FalseLoopEnd`, and the duplicated
`_Reader`/`_R` materialization-reader stub (3 copies) to a module-level
`_MatReader`.
- test_loop_operators: extract `_ipc_one_row()` for the one-row Arrow-IPC
table
literal built inline in 4 LoopEnd tests.
- RegionExecutionCoordinator.createOutputPortStorageObjects: provision the
result and state documents in one `Seq(...).foreach` instead of two
near-identical `provisionOutputDocument` calls.
Verified: test_loop_operators + test_main_loop + test_output_manager (51)
pass,
ruff clean; amber Test/compile + scalafmt clean.
---
.../scheduling/RegionExecutionCoordinator.scala | 21 ++++----
.../test/python/core/models/test_loop_operators.py | 13 +++--
.../test/python/core/runnables/test_main_loop.py | 61 +++++++++-------------
3 files changed, 43 insertions(+), 52 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4c9086c9b6..9d43f4e18b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -619,18 +619,15 @@ class RegionExecutionCoordinator(
// MainLoop._process_state_frame -- which is orthogonal to this
// region-provisioning reuse.)
val reusesOutputStorage =
region.getOperators.exists(_.reusesOutputStorageOnReExecution)
- RegionExecutionCoordinator.provisionOutputDocument(
- resultURI,
- reusesOutputStorage,
- DocumentFactory.documentExists,
- uri => DocumentFactory.createDocument(uri, schema)
- )
- RegionExecutionCoordinator.provisionOutputDocument(
- stateURI,
- reusesOutputStorage,
- DocumentFactory.documentExists,
- uri => DocumentFactory.createDocument(uri, State.schema)
- )
+ Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+ case (uri, sch) =>
+ RegionExecutionCoordinator.provisionOutputDocument(
+ uri,
+ reusesOutputStorage,
+ DocumentFactory.documentExists,
+ u => DocumentFactory.createDocument(u, sch)
+ )
+ }
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git a/amber/src/test/python/core/models/test_loop_operators.py
b/amber/src/test/python/core/models/test_loop_operators.py
index c2d52e6064..b45fd2e377 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -113,6 +113,11 @@ class _StubLoopEnd(LoopEndOperator):
# ---------------------------------------------------------------------------
+def _ipc_one_row():
+ """One-row table as Arrow IPC bytes (the loop `table` payload)."""
+ return table_to_ipc_bytes(Table([Tuple({"v": 1})]))
+
+
class TestLoopStartProcessState:
def
test_first_time_state_is_merged_into_self_state_and_none_is_returned(self):
# First entry: state from upstream (no LoopStartStateURI). The
@@ -254,7 +259,7 @@ class TestLoopEndBase:
op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
assert op._loop_table is None
op.process_state(
- State({"i": 0, "table": table_to_ipc_bytes(Table([Tuple({"v":
1})]))}),
+ State({"i": 0, "table": _ipc_one_row()}),
port=0,
)
assert op._loop_table is not None
@@ -281,7 +286,7 @@ class TestLoopEndMatchingBranch:
incoming = State(
{
"i": 2,
- "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+ "table": _ipc_one_row(),
}
)
@@ -303,7 +308,7 @@ class TestLoopEndMatchingBranch:
State(
{
"i": 1,
- "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+ "table": _ipc_one_row(),
}
),
port=0,
@@ -315,7 +320,7 @@ class TestLoopEndMatchingBranch:
State(
{
"i": 2,
- "table": table_to_ipc_bytes(Table([Tuple({"v": 1})])),
+ "table": _ipc_one_row(),
}
),
port=0,
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py
b/amber/src/test/python/core/runnables/test_main_loop.py
index 6df9f96055..3c62290881 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -88,6 +88,18 @@ from pytexera.udf.examples.count_batch_operator import
CountBatchOperator
from pytexera.udf.examples.echo_operator import EchoOperator
+class _FalseLoopEnd(LoopEndOperator):
+ def condition(self):
+ return False
+
+
+class _MatReader:
+ """Minimal input-port materialization reader stub carrying a uri."""
+
+ def __init__(self, uri):
+ self.uri = uri
+
+
class TestMainLoop:
@pytest.fixture
def command_sequence(self):
@@ -1927,11 +1939,7 @@ class TestMainLoop:
# loop: the runtime decrements and forwards, skipping the operator. It
# also resets this (inner) LoopEnd's output storage -- the outer loop
# advancing is the signal to drop the previous outer iteration's rows.
- class StubLoopEnd(LoopEndOperator):
- def condition(self):
- return False
-
- main_loop.context.executor_manager.executor = StubLoopEnd()
+ main_loop.context.executor_manager.executor = _FalseLoopEnd()
emitted, switched, reset_calls = self._capture_state_emit(
main_loop, monkeypatch
)
@@ -1960,11 +1968,7 @@ class TestMainLoop:
# loop_counter == 0 is the matching loop: the runtime runs the operator
# (consume) via the context switch. The operator returns None, so no
# state is emitted; the loop-back is driven by complete() separately.
- class StubLoopEnd(LoopEndOperator):
- def condition(self):
- return False
-
- main_loop.context.executor_manager.executor = StubLoopEnd()
+ main_loop.context.executor_manager.executor = _FalseLoopEnd()
emitted, switched, reset_calls = self._capture_state_emit(
main_loop, monkeypatch
)
@@ -1986,11 +1990,7 @@ class TestMainLoop:
# the previous outer iteration's rows are dropped before this outer
# iteration's inner results accumulate. It must NOT invoke the
operator,
# and forwards the outer state one level out (loop_counter - 1).
- class StubLoopEnd(LoopEndOperator):
- def condition(self):
- return False
-
- main_loop.context.executor_manager.executor = StubLoopEnd()
+ main_loop.context.executor_manager.executor = _FalseLoopEnd()
emitted, switched, reset_calls = self._capture_state_emit(
main_loop, monkeypatch
)
@@ -2023,11 +2023,7 @@ class TestMainLoop:
# metadata must yield a user-facing `current_input_state` that
# contains only the inner State's keys -- never the envelope
# names.
- class StubLoopEnd(LoopEndOperator):
- def condition(self):
- return False
-
- main_loop.context.executor_manager.executor = StubLoopEnd()
+ main_loop.context.executor_manager.executor = _FalseLoopEnd()
# Standard stubs: emit/save/switch don't fire real work. The
# consume branch sets `current_input_state` to the inner State
# BEFORE the (stubbed) context switch, so the assertion below
@@ -2083,11 +2079,7 @@ class TestMainLoop:
`input_manager.get_input_port_mat_reader_threads` returns: one
port -> one reader runnable carrying the given URI."""
- class _Reader:
- def __init__(self, u):
- self.uri = u
-
- return {PortIdentity(0, internal=False): [_Reader(uri)]}
+ return {PortIdentity(0, internal=False): [_MatReader(uri)]}
def test_compute_loop_start_id_parses_worker_id_via_canonical_helper(
self, main_loop, monkeypatch
@@ -2139,13 +2131,9 @@ class TestMainLoop:
# surfaces the misconfiguration loudly instead.
main_loop.context.worker_id = "Worker:WF1-LoopStart-op-abc-main-0"
- class _R:
- def __init__(self, u):
- self.uri = u
-
two_ports = {
- PortIdentity(0, internal=False): [_R("vfs:///a")],
- PortIdentity(1, internal=False): [_R("vfs:///b")],
+ PortIdentity(0, internal=False): [_MatReader("vfs:///a")],
+ PortIdentity(1, internal=False): [_MatReader("vfs:///b")],
}
monkeypatch.setattr(
main_loop.context.input_manager,
@@ -2163,14 +2151,15 @@ class TestMainLoop:
# reader runnable would have silently picked the first.
main_loop.context.worker_id = "Worker:WF1-LoopStart-op-abc-main-0"
- class _R:
- def __init__(self, u):
- self.uri = u
-
monkeypatch.setattr(
main_loop.context.input_manager,
"get_input_port_mat_reader_threads",
- lambda: {PortIdentity(0, internal=False): [_R("vfs:///a"),
_R("vfs:///b")]},
+ lambda: {
+ PortIdentity(0, internal=False): [
+ _MatReader("vfs:///a"),
+ _MatReader("vfs:///b"),
+ ]
+ },
)
with pytest.raises(RuntimeError, match="exactly one input reader"):