This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 2095b592ee docs+test(loop): document and guard
OutputManager.reset_output_storage
2095b592ee is described below
commit 2095b592ee4aab7aa0b909c37f6534a21f75692a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 16:45:12 2026 -0700
docs+test(loop): document and guard OutputManager.reset_output_storage
Addresses PR #4206 review feedback
(https://github.com/apache/texera/pull/4206#discussion_r3400851469): an
earlier reply claimed reset_storage had been renamed to
reset_output_storage, documented, and guarded, but that work was lost
when the branch was rebased -- the method was back to a bare
reset_storage with no docstring and no checks, and the reason
truncation is safe lived only in the PR description.
Re-apply on the consolidated branch:
* Rename reset_storage -> reset_output_storage (matching main_loop's
caller and the two __init__/set_up comments that referenced the
intended name).
* Add a docstring: what it does (drop+recreate the result AND state
tables, reopen the writers), that it's called only by a Loop End
worker once per iteration, and -- the rationale that previously
lived only in the PR description -- WHY truncating live storage is
safe: loops run in MATERIALIZED mode, so downstream operators don't
read this output until the loop region completes, so no reader can
observe an intermediate truncation.
* Add the two previously-implicit precondition guards: exactly one
output port, and the storage writer already set up
(_storage_uri_base populated). Both raise a clear RuntimeError
instead of silently resetting the wrong port or dereferencing None.
* test_output_manager.py: new TestResetOutputStorage covering the
happy path (close -> recreate result+state docs -> reopen writer)
and both guard failures, with iceberg/thread collaborators mocked.
---
.../core/architecture/packaging/output_manager.py | 39 ++++++++-
amber/src/main/python/core/runnables/main_loop.py | 2 +-
.../architecture/packaging/test_output_manager.py | 92 +++++++++++++++++++++-
3 files changed, 127 insertions(+), 6 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index a84b651f78..a349fb369c 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -93,7 +93,7 @@ class OutputManager:
] = dict()
# Loop-end operators have a single output port; remember its base
- # URI so `reset_loopend_storage` can re-provision the iceberg
+ # URI so `reset_output_storage` can re-provision the iceberg
# tables on each loop iteration.
self._storage_uri_base: typing.Optional[str] = None
@@ -138,7 +138,7 @@ class OutputManager:
state materialization on the same port. `storage_uri_base` is the
port's base URI; the result and state URIs are derived from it.
"""
- # Remember the base URI so `reset_loopend_storage` can re-provision
+ # Remember the base URI so `reset_output_storage` can re-provision
# the iceberg tables on subsequent loop iterations.
self._storage_uri_base = storage_uri_base
document, _ = DocumentFactory.open_document(
@@ -234,8 +234,39 @@ class OutputManager:
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)
- def reset_storage(self) -> None:
- port_id = self.get_port_ids()[0]
+ def reset_output_storage(self) -> None:
+ """Drop and recreate this operator's result and state tables, then
+ reopen the storage writers against the empty tables.
+
+ Called only by a Loop End worker, once per loop iteration (see the
+ ``LoopEndOperator`` branch in ``MainLoop.process_input_state``). Each
+ iteration must start from empty tables so the materialization the
+ downstream eventually reads holds only the final iteration's rows
+ rather than every iteration's rows concatenated.
+
+ Truncating live storage is safe here because a workflow containing a
+ loop runs in MATERIALIZED execution mode: downstream operators do not
+ start reading this output until the loop region has fully completed,
+ so no reader can observe an intermediate truncation.
+
+ Preconditions (always satisfied by a Loop End worker): the operator
+ has exactly one output port, and ``set_up_port_storage_writer`` has
+ already run for it (so ``_storage_uri_base`` is populated). Both are
+ checked so future misuse fails loudly instead of silently resetting
+ the wrong port or dereferencing ``None``.
+ """
+ port_ids = self.get_port_ids()
+ if len(port_ids) != 1:
+ raise RuntimeError(
+ f"reset_output_storage expects exactly one output port, "
+ f"but found {len(port_ids)}"
+ )
+ if self._storage_uri_base is None:
+ raise RuntimeError(
+ "reset_output_storage called before the output port's storage "
+ "writer was set up"
+ )
+ port_id = port_ids[0]
storage_uri_base = self._storage_uri_base
self.close_port_storage_writers()
DocumentFactory.create_document(
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 95bca48daa..93b4af9873 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -264,7 +264,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
if output_state is not None:
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator):
- self.context.output_manager.reset_storage()
+ self.context.output_manager.reset_output_storage()
elif isinstance(executor, LoopStartOperator):
# A LoopStart stamps its own id/uri onto the state it emits.
(
diff --git
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index 95e03dca63..a8fec1b72d 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
import pytest
@@ -105,3 +105,93 @@ class TestSaveStateToStorageIfNeeded:
thread_a.join.assert_called_once()
thread_b.join.assert_called_once()
assert output_manager._port_state_writers == {}
+
+
+class TestResetOutputStorage:
+ """Covers OutputManager.reset_output_storage, the per-iteration
+ result+state table reset a Loop End worker runs between loop
+ iterations.
+
+ The collaborators that touch real iceberg storage / writer threads
+ (DocumentFactory, close_port_storage_writers,
+ set_up_port_storage_writer) are replaced with spies so these tests
+ stay hermetic and assert the contract: drop+recreate both tables,
+ bracketed by closing the old writers and opening fresh ones, with
+ both preconditions enforced.
+ """
+
+ @pytest.fixture
+ def output_manager(self):
+ return OutputManager(worker_id="Worker:WF0-test-op-main-0")
+
+ @staticmethod
+ def _add_port_with_storage(om, port_id, uri, schema):
+ # Stand in for what add_output_port + set_up_port_storage_writer
+ # populate, without spinning up real iceberg tables and threads.
+ port = MagicMock()
+ port.get_schema.return_value = schema
+ om._ports[port_id] = port
+ om._storage_uri_base = uri
+
+ def test_recreates_result_and_state_tables_and_reopens_writer(self,
output_manager):
+ port_id = PortIdentity(id=0, internal=False)
+ schema = MagicMock(name="schema")
+ self._add_port_with_storage(output_manager, port_id, "vfs:///base",
schema)
+
+ output_manager.close_port_storage_writers = MagicMock()
+ output_manager.set_up_port_storage_writer = MagicMock()
+
+ with (
+ patch(
+ "core.architecture.packaging.output_manager.DocumentFactory"
+ ) as doc_factory,
+ patch(
+ "core.architecture.packaging.output_manager.VFSURIFactory"
+ ) as uri_factory,
+ ):
+ uri_factory.result_uri.return_value = "vfs:///base/result"
+ uri_factory.state_uri.return_value = "vfs:///base/state"
+ output_manager.reset_output_storage()
+
+ # Both the result and the state table are recreated, which drops
+ # the rows the previous loop iteration wrote.
+ recreated = {
+ call.args[0] for call in doc_factory.create_document.call_args_list
+ }
+ assert recreated == {"vfs:///base/result", "vfs:///base/state"}
+ # The old writers are flushed/closed first, and fresh writers are
+ # opened against the recreated tables afterwards.
+ output_manager.close_port_storage_writers.assert_called_once_with()
+ output_manager.set_up_port_storage_writer.assert_called_once_with(
+ port_id, "vfs:///base"
+ )
+
+ def test_raises_when_no_output_port(self, output_manager):
+ output_manager._storage_uri_base = "vfs:///base"
+ output_manager.close_port_storage_writers = MagicMock()
+ with
patch("core.architecture.packaging.output_manager.DocumentFactory"):
+ with pytest.raises(RuntimeError, match="exactly one output port"):
+ output_manager.reset_output_storage()
+ # Must fail before touching storage.
+ output_manager.close_port_storage_writers.assert_not_called()
+
+ def test_raises_when_multiple_output_ports(self, output_manager):
+ schema = MagicMock()
+ self._add_port_with_storage(
+ output_manager, PortIdentity(id=0, internal=False), "vfs:///base",
schema
+ )
+ # A second port makes the count != 1; the shared _storage_uri_base
+ # is already set, so the port-count guard is what must trip.
+ output_manager._ports[PortIdentity(id=1, internal=False)] = MagicMock()
+ with pytest.raises(RuntimeError, match="exactly one output port"):
+ output_manager.reset_output_storage()
+
+ def test_raises_when_storage_writer_not_set_up(self, output_manager):
+ # The port exists but no storage URI was assigned -- i.e.
+ # set_up_port_storage_writer never ran for it.
+ output_manager._ports[PortIdentity(id=0, internal=False)] = MagicMock()
+ output_manager.close_port_storage_writers = MagicMock()
+ with
patch("core.architecture.packaging.output_manager.DocumentFactory"):
+ with pytest.raises(RuntimeError, match="storage writer was set
up"):
+ output_manager.reset_output_storage()
+ output_manager.close_port_storage_writers.assert_not_called()