This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new 4428c9d5b0 update
4428c9d5b0 is described below

commit 4428c9d5b0c9db35d6a018e9bb5571784c4ce735
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed May 6 03:08:40 2026 -0700

    update
---
 .../core/architecture/packaging/output_manager.py  |  46 +++++++---
 .../messaginglayer/OutputManager.scala             |  16 ++--
 .../architecture/packaging/test_output_manager.py  | 100 +++++++++------------
 3 files changed, 82 insertions(+), 80 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 5614a64a6b..1220a9f15f 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -87,7 +87,9 @@ class OutputManager:
             PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
         ] = dict()
 
-        self._state_writers: typing.Dict[PortIdentity, typing.Any] = dict()
+        self._port_state_writers: typing.Dict[
+            PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
+        ] = dict()
 
     def is_missing_output_ports(self):
         """
@@ -150,9 +152,25 @@ class OutputManager:
         state_document, _ = DocumentFactory.open_document(
             State.uri_from_result_uri(storage_uri)
         )
-        state_writer = 
state_document.writer(str(get_worker_index(self.worker_id)))
-        state_writer.open()
-        self._state_writers[port_id] = state_writer
+        state_buffered_item_writer = state_document.writer(
+            str(get_worker_index(self.worker_id))
+        )
+        state_writer_queue = Queue()
+        state_port_writer = PortStorageWriter(
+            buffered_item_writer=state_buffered_item_writer,
+            queue=state_writer_queue,
+        )
+        state_writer_thread = threading.Thread(
+            target=state_port_writer.run,
+            daemon=True,
+            name=f"port_state_writer_thread_{port_id}",
+        )
+        state_writer_thread.start()
+        self._port_state_writers[port_id] = (
+            state_writer_queue,
+            state_port_writer,
+            state_writer_thread,
+        )
 
     def get_port(self, port_id=None) -> WorkerPort:
         return list(self._ports.values())[0]
@@ -182,14 +200,12 @@ class OutputManager:
             )
 
     def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        element = PortStorageWriterElement(data_tuple=state.to_tuple())
         if port_id is None:
-            writers = self._state_writers.values()
-        elif port_id in self._state_writers:
-            writers = [self._state_writers[port_id]]
-        else:
-            return
-        for writer in writers:
-            writer.put_one(state.to_tuple())
+            for writer_queue, _, _ in self._port_state_writers.values():
+                writer_queue.put(element)
+        elif port_id in self._port_state_writers:
+            self._port_state_writers[port_id][0].put(element)
 
     def close_port_storage_writers(self) -> None:
         """
@@ -204,9 +220,11 @@ class OutputManager:
         for _, _, writer_thread in self._port_storage_writers.values():
             # This blocking call will wait for all the writer to finish commit
             writer_thread.join()
-        for state_writer in self._state_writers.values():
-            state_writer.close()
-        self._state_writers.clear()
+        for _, state_writer, _ in self._port_state_writers.values():
+            state_writer.stop()
+        for _, _, state_writer_thread in self._port_state_writers.values():
+            state_writer_thread.join()
+        self._port_state_writers.clear()
 
     def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) 
-> None:
         """
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index a39ac1dcb4..9455b92556 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,7 +124,7 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
-  private val stateWriters: mutable.HashMap[PortIdentity, 
BufferedItemWriter[Tuple]] =
+  private val stateWriterThreads: mutable.HashMap[PortIdentity, 
OutputPortResultWriterThread] =
     mutable.HashMap()
 
   /**
@@ -236,8 +236,8 @@ class OutputManager(
     })
   }
 
-  def saveStateToStorageIfNeeded(state: State): Unit = {
-    stateWriters.values.foreach(_.putOne(state.toTuple))
+  private def saveStateToStorageIfNeeded(state: State): Unit = {
+    stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
   }
 
   /**
@@ -253,7 +253,10 @@ class OutputManager(
         writerThread.join()
       case None =>
     }
-    this.stateWriters.remove(outputPortId).foreach(_.close())
+    this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>
+      writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
+      writerThread.join()
+    }
   }
 
   def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -304,8 +307,9 @@ class OutputManager(
       ._1
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
-    stateWriter.open()
-    this.stateWriters(portId) = stateWriter
+    val stateWriterThread = new OutputPortResultWriterThread(stateWriter)
+    this.stateWriterThreads(portId) = stateWriterThread
+    stateWriterThread.start()
   }
 
 }
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py 
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index 8f1daf8052..dcf7ccde67 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -15,15 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
 
 import pytest
 
 from core.architecture.packaging.output_manager import OutputManager
 from core.models.state import State
+from core.storage.runnables.port_storage_writer import PortStorageWriterElement
 from proto.org.apache.texera.amber.core import PortIdentity
 
 
+def _stub_state_writer(output_manager, port_id):
+    """Inject a (queue, writer, thread) triple as if a port were set up."""
+    queue = MagicMock()
+    writer = MagicMock()
+    thread = MagicMock()
+    output_manager._port_state_writers[port_id] = (queue, writer, thread)
+    return queue, writer, thread
+
+
 class TestSaveStateToStorageIfNeeded:
     @pytest.fixture
     def output_manager(self):
@@ -44,84 +54,54 @@ class TestSaveStateToStorageIfNeeded:
     def test_no_state_writers_is_a_noop(self, output_manager, state):
         # With no port set up, save_state_to_storage_if_needed must not
         # touch any writer.
-        output_manager.save_state_to_storage_if_needed(state)  # no-op, no 
exception
+        output_manager.save_state_to_storage_if_needed(state)  # no-op
 
     def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
         output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
         # No assertion needed -- the absence of any writer means nothing
         # was attempted.
 
-    def test_writes_to_every_port_when_port_id_omitted(
+    def test_enqueues_to_every_port_when_port_id_omitted(
         self, output_manager, state, port_a, port_b
     ):
-        writer_a = MagicMock()
-        writer_b = MagicMock()
-        output_manager._state_writers[port_a] = writer_a
-        output_manager._state_writers[port_b] = writer_b
+        queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+        queue_b, _, _ = _stub_state_writer(output_manager, port_b)
 
         output_manager.save_state_to_storage_if_needed(state)
 
-        writer_a.put_one.assert_called_once()
-        writer_b.put_one.assert_called_once()
-        # Long-lived writers must NOT be closed per state -- otherwise
-        # we'd be back to one Iceberg snapshot per state.
-        writer_a.close.assert_not_called()
-        writer_b.close.assert_not_called()
+        # Each port's writer queue receives one PortStorageWriterElement.
+        # Critically, save is non-blocking -- the call must not invoke
+        # put_one / close on the buffered writer directly (those happen
+        # off-thread).
+        assert queue_a.put.call_count == 1
+        assert queue_b.put.call_count == 1
+        assert isinstance(queue_a.put.call_args.args[0], 
PortStorageWriterElement)
+        assert isinstance(queue_b.put.call_args.args[0], 
PortStorageWriterElement)
 
-    def test_writes_only_to_selected_port_when_port_id_specified(
+    def test_enqueues_only_to_selected_port_when_port_id_specified(
         self, output_manager, state, port_a, port_b
     ):
-        writer_a = MagicMock()
-        writer_b = MagicMock()
-        output_manager._state_writers[port_a] = writer_a
-        output_manager._state_writers[port_b] = writer_b
+        queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+        queue_b, _, _ = _stub_state_writer(output_manager, port_b)
 
         output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
 
-        writer_a.put_one.assert_called_once()
-        writer_b.put_one.assert_not_called()
-
-    def test_state_writer_is_opened_at_port_setup(self, output_manager, 
port_a):
-        # set_up_port_storage_writer should open the result document AND
-        # the state document, then cache the state writer for reuse.
-        result_doc = MagicMock()
-        state_doc = MagicMock()
-        state_writer = MagicMock()
-        state_doc.writer.return_value = state_writer
-
-        with patch(
-            "core.architecture.packaging.output_manager.DocumentFactory"
-        ) as mock_factory:
-            mock_factory.open_document.side_effect = [
-                (result_doc, MagicMock()),
-                (state_doc, MagicMock()),
-            ]
-
-            output_manager.set_up_port_storage_writer(
-                port_a, "vfs:///wf/0/exec/0/result/op-a"
-            )
-
-            opened = [c.args[0] for c in 
mock_factory.open_document.call_args_list]
-            assert opened == [
-                "vfs:///wf/0/exec/0/result/op-a",
-                "vfs:///wf/0/exec/0/state/op-a",
-            ]
-            state_writer.open.assert_called_once()
-            assert output_manager._state_writers[port_a] is state_writer
-
-    def test_close_port_storage_writers_flushes_state_writers(
+        assert queue_a.put.call_count == 1
+        queue_b.put.assert_not_called()
+
+    def test_close_port_storage_writers_stops_state_threads(
         self, output_manager, port_a, port_b
     ):
-        # After the port completes, the long-lived state writer's buffer
-        # must be flushed and the writer closed (one Iceberg commit per
-        # port instead of one per state).
-        writer_a = MagicMock()
-        writer_b = MagicMock()
-        output_manager._state_writers[port_a] = writer_a
-        output_manager._state_writers[port_b] = writer_b
+        # After the port completes, every state-writer thread must be
+        # stopped and joined so the buffered writer's close() (which
+        # flushes the final Iceberg commit) actually runs.
+        _, writer_a, thread_a = _stub_state_writer(output_manager, port_a)
+        _, writer_b, thread_b = _stub_state_writer(output_manager, port_b)
 
         output_manager.close_port_storage_writers()
 
-        writer_a.close.assert_called_once()
-        writer_b.close.assert_called_once()
-        assert output_manager._state_writers == {}
+        writer_a.stop.assert_called_once()
+        writer_b.stop.assert_called_once()
+        thread_a.join.assert_called_once()
+        thread_b.join.assert_called_once()
+        assert output_manager._port_state_writers == {}

Reply via email to