This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ba490eea3 feat: add state materialization across regions (#4490)
9ba490eea3 is described below

commit 9ba490eea3f49415bc11dca2dd1e7af1780f1074
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu May 14 18:48:42 2026 -0700

    feat: add state materialization across regions (#4490)
    
    ### What changes were proposed in this PR?
    This PR adds state materialization as a general mechanism for passing
    state across different regions.
    
    - materialize state as a separate storage object alongside result
    storage
    - store one serialized state per row
    - use the same cross-language format on Python and Scala/Java sides
    - let downstream regions read back both data and state when needed
    
    ### Any related issues, documentation, discussions?
    Closes #4489
    
    ### How was this PR tested?
    
    - added a Python round-trip test for materialized state storage in
    test_iceberg_document.py
    - added a Scala round-trip test for materialized state storage in
    IcebergDocumentSpec.scala
    - You can manually test with this workflow: [Test
    
States.json](https://github.com/user-attachments/files/27779044/Test.States.json)
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)
    
    ---------
    
    Signed-off-by: Xinyuan Lin <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
    Co-authored-by: Xiaozhen Liu <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .github/workflows/build.yml                        |  37 +++
 .../handlers/control/assign_port_handler.py        |   6 +-
 .../core/architecture/packaging/output_manager.py  |  63 ++++-
 amber/src/main/python/core/runnables/main_loop.py  |   1 +
 .../main/python/core/storage/document_factory.py   | 107 +++++----
 .../input_port_materialization_reader_runnable.py  |  21 +-
 .../src/main/python/core/storage/storage_config.py |   3 +
 .../main/python/core/storage/vfs_uri_factory.py    |  17 +-
 amber/src/main/python/texera_run_python_worker.py  |   2 +
 .../messaginglayer/OutputManager.scala             |  52 ++++-
 .../pythonworker/PythonWorkflowWorker.scala        |   1 +
 .../scheduling/CostBasedScheduleGenerator.scala    |   8 +-
 .../ExpansionGreedyScheduleGenerator.scala         |   4 +-
 .../scheduling/RegionExecutionCoordinator.scala    |  16 +-
 .../scheduling/config/PortConfig.scala             |  10 +-
 .../InputPortMaterializationReaderThread.scala     |  34 ++-
 ...d.scala => OutputPortStorageWriterThread.scala} |   2 +-
 .../worker/promisehandlers/AssignPortHandler.scala |   4 +-
 .../architecture/packaging/test_output_manager.py  | 107 +++++++++
 .../packaging/test_state_materialization_e2e.py    | 258 +++++++++++++++++++++
 .../test/python/core/runnables/test_main_loop.py   | 157 +++++++++++++
 .../core/storage/iceberg/test_iceberg_document.py  | 108 ++++++++-
 ...t_input_port_materialization_reader_runnable.py |  99 ++++++++
 .../python/core/storage/test_document_factory.py   | 134 +++++++++++
 .../pytexera/storage/test_large_binary_manager.py  |   1 +
 ...ala => OutputPortStorageWriterThreadSpec.scala} |  18 +-
 common/config/src/main/resources/storage.conf      |   3 +
 .../amber/config/EnvironmentalVariable.scala       |   1 +
 .../apache/texera/amber/config/StorageConfig.scala |   3 +
 .../amber/core/storage/DocumentFactory.scala       |   2 +
 .../texera/amber/core/storage/VFSURIFactory.scala  |  22 +-
 .../amber/core/storage/VFSURIFactorySpec.scala     |  23 +-
 .../result/iceberg/IcebergDocumentSpec.scala       |  83 ++++++-
 .../result/iceberg/IcebergTableStatsSpec.scala     |  22 +-
 34 files changed, 1287 insertions(+), 142 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index b583f59725..8b0a5ea522 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -633,6 +633,43 @@ jobs:
           flags: python
           fail_ci_if_error: false
 
+  python-state-materialization-mac:
+    # Diagnostic leg: cross-region state materialization is reported to
+    # fail on macOS while working on Windows / Linux. The main `python`
+    # job above runs only on ubuntu-latest because it depends on a
+    # postgres service container (service containers don't work on
+    # macOS runners). The state-materialization integration tests use
+    # an in-process sqlite-backed SqlCatalog instead, so we can run
+    # them on macOS without postgres infra. If they fail here but pass
+    # in the main `python` job, we've reproduced the macOS-specific
+    # regression in CI.
+    if: ${{ inputs.run_python }}
+    runs-on: macos-latest
+    steps:
+      - name: Checkout Texera
+        uses: actions/checkout@v5
+        with:
+          ref: ${{ inputs.checkout_ref || github.sha }}
+          fetch-depth: 0
+      - name: Prepare backport workspace
+        if: ${{ inputs.backport_target_branch != '' }}
+        run: bash ./.github/scripts/prepare-backport-checkout.sh "${{ 
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
+      - name: Set up Python 3.12
+        uses: actions/setup-python@v6
+        with:
+          python-version: "3.12"
+      - name: Install dependencies
+        run: |
+          python -m pip install uv
+          if [ -f amber/requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
+          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
+          if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
-r amber/dev-requirements.txt; fi
+      - name: Run state-materialization integration tests
+        run: |
+          cd amber && pytest -sv \
+            
src/test/python/core/architecture/packaging/test_state_materialization_e2e.py \
+            
src/test/python/core/runnables/test_main_loop.py::TestMainLoop::test_process_start_channel_persists_produce_state_on_start_output
+
   agent-service:
     if: ${{ inputs.run_agent_service }}
     name: ${{ format('agent-service{0} ({1})', inputs.job_name_suffix, 
matrix.os) }}
diff --git 
a/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
index 73ebad26b3..71f1e7f96b 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
@@ -44,10 +44,10 @@ class AssignPortHandler(ControlHandler):
                     channel_id=channel_id, port_id=req.port_id
                 )
         else:
-            storage_uri = None
+            storage_uri_base = None
             if len(req.storage_uris) > 0 and req.storage_uris[0]:
-                storage_uri = req.storage_uris[0]
+                storage_uri_base = req.storage_uris[0]
             self.context.output_manager.add_output_port(
-                req.port_id, Schema(raw_schema=req.schema), storage_uri
+                req.port_id, Schema(raw_schema=req.schema), storage_uri_base
             )
         return EmptyReturn()
diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index bf4afbf396..b85e3e39bf 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -45,6 +45,7 @@ from core.models import Tuple, Schema, StateFrame
 from core.models.payload import DataPayload, DataFrame
 from core.models.state import State
 from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
 from core.storage.runnables.port_storage_writer import (
     PortStorageWriter,
     PortStorageWriterElement,
@@ -87,6 +88,10 @@ class OutputManager:
             PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
         ] = dict()
 
+        self._port_state_writers: typing.Dict[
+            PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
+        ] = dict()
+
     def is_missing_output_ports(self):
         """
         This method is only used for ensuring correct region execution.
@@ -107,26 +112,30 @@ class OutputManager:
         self,
         port_id: PortIdentity,
         schema: Schema,
-        storage_uri: typing.Optional[str] = None,
+        storage_uri_base: typing.Optional[str] = None,
     ) -> None:
         if port_id.id is None:
             port_id.id = 0
         if port_id.internal is None:
             port_id.internal = False
 
-        if storage_uri is not None:
-            self.set_up_port_storage_writer(port_id, storage_uri)
+        if storage_uri_base is not None:
+            self.set_up_port_storage_writer(port_id, storage_uri_base)
 
         # each port can only be added and initialized once.
         if port_id not in self._ports:
             self._ports[port_id] = WorkerPort(schema)
 
-    def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: 
str):
+    def set_up_port_storage_writer(self, port_id: PortIdentity, 
storage_uri_base: str):
         """
         Create a separate thread for saving output tuples of a port
-        to storage in batch.
+        to storage in batch, and open a long-lived buffered writer for
+        state materialization on the same port. `storage_uri_base` is the
+        port's base URI; the result and state URIs are derived from it.
         """
-        document, _ = DocumentFactory.open_document(storage_uri)
+        document, _ = DocumentFactory.open_document(
+            VFSURIFactory.result_uri(storage_uri_base)
+        )
         buffered_item_writer = 
document.writer(str(get_worker_index(self.worker_id)))
         writer_queue = Queue()
         port_storage_writer = PortStorageWriter(
@@ -144,6 +153,29 @@ class OutputManager:
             writer_thread,
         )
 
+        state_document, _ = DocumentFactory.open_document(
+            VFSURIFactory.state_uri(storage_uri_base)
+        )
+        state_buffered_item_writer = state_document.writer(
+            str(get_worker_index(self.worker_id))
+        )
+        state_writer_queue = Queue()
+        state_port_writer = PortStorageWriter(
+            buffered_item_writer=state_buffered_item_writer,
+            queue=state_writer_queue,
+        )
+        state_writer_thread = threading.Thread(
+            target=state_port_writer.run,
+            daemon=True,
+            name=f"port_state_writer_thread_{port_id}",
+        )
+        state_writer_thread.start()
+        self._port_state_writers[port_id] = (
+            state_writer_queue,
+            state_port_writer,
+            state_writer_thread,
+        )
+
     def get_port(self, port_id=None) -> WorkerPort:
         return list(self._ports.values())[0]
 
@@ -171,6 +203,20 @@ class OutputManager:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        # When port_id is omitted the same state row is fanned out to
+        # every output port's state table. This mirrors the
+        # broadcast-to-all-workers behavior on the emit side: state is
+        # shared context, not per-key data, so every downstream operator
+        # (and every worker reading the materialization) needs the full
+        # set.
+        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        if port_id is None:
+            for writer_queue, _, _ in self._port_state_writers.values():
+                writer_queue.put(element)
+        elif port_id in self._port_state_writers:
+            self._port_state_writers[port_id][0].put(element)
+
     def close_port_storage_writers(self) -> None:
         """
         Flush the buffers of port storage writers and wait for all the
@@ -184,6 +230,11 @@ class OutputManager:
         for _, _, writer_thread in self._port_storage_writers.values():
             # This blocking call will wait for all the writer to finish commit
             writer_thread.join()
+        for _, state_writer, _ in self._port_state_writers.values():
+            state_writer.stop()
+        for _, _, state_writer_thread in self._port_state_writers.values():
+            state_writer_thread.join()
+        self._port_state_writers.clear()
 
     def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) 
-> None:
         """
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index ab35cda81b..1334af12bf 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -202,6 +202,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
                         payload=batch,
                     )
                 )
+            
self.context.output_manager.save_state_to_storage_if_needed(output_state)
 
     def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
         """
diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index 9b686ab66b..bd690ceb59 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -61,30 +61,35 @@ class DocumentFactory:
         if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
-                # field name encoding
-                iceberg_schema = amber_schema_to_iceberg_schema(schema)
-
-                create_table(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    override_if_exists=True,
-                )
-
-                return IcebergDocument[Tuple](
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+            # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+            # field name encoding
+            iceberg_schema = amber_schema_to_iceberg_schema(schema)
+
+            create_table(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+                iceberg_schema,
+                override_if_exists=True,
+            )
+
+            return IcebergDocument[Tuple](
+                namespace,
+                storage_key,
+                iceberg_schema,
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for creating the 
document"
@@ -96,30 +101,36 @@ class DocumentFactory:
         if parsed_uri.scheme == "vfs":
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                table = load_table_metadata(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                )
-
-                if table is None:
-                    raise ValueError("No storage is found for the given URI")
-
-                amber_schema = Schema(table.schema().as_arrow())
-
-                document = IcebergDocument(
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    table.schema(),
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-                return document, amber_schema
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+
+            table = load_table_metadata(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+            )
+
+            if table is None:
+                raise ValueError("No storage is found for the given URI")
+
+            amber_schema = Schema(table.schema().as_arrow())
+
+            document = IcebergDocument(
+                namespace,
+                storage_key,
+                table.schema(),
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+            return document, amber_schema
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for opening the 
document"
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index 6122bbb8b9..3e0e2d48ab 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -34,9 +34,10 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload
+from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
 from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
 from core.util import Stoppable, get_one_of
 from core.util.runnable import Runnable
 from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
@@ -132,14 +133,28 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
         emits an EndChannel ECM. Use the same partitioner implementation as 
that in
         output manager, where a tuple is batched by the partitioner and only
         selected as the input of this worker according to the partitioner.
+
+        States and tuples are persisted to separate tables, so the original
+        interleaving is lost and replay has to pick an order: we replay states
+        first because downstream operators typically need their state set up
+        before they process the incoming tuples. Every state is broadcast to
+        every downstream worker -- no partitioner filtering, unlike the tuple
+        loop. State is shared context (e.g. config / counters), not per-key
+        data, so each worker needs the full set.
         """
         try:
             self.materialization, self.tuple_schema = 
DocumentFactory.open_document(
-                self.uri
+                VFSURIFactory.result_uri(self.uri)
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
-            storage_iterator = self.materialization.get()
 
+            state_document, _ = DocumentFactory.open_document(
+                VFSURIFactory.state_uri(self.uri)
+            )
+            for state_row in state_document.get():
+                self.emit_payload(StateFrame(State.from_tuple(state_row)))
+
+            storage_iterator = self.materialization.get()
             # Iterate and process tuples.
             for tup in storage_iterator:
                 if self._stopped:
diff --git a/amber/src/main/python/core/storage/storage_config.py 
b/amber/src/main/python/core/storage/storage_config.py
index 0e47bdb71a..8233590987 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -32,6 +32,7 @@ class StorageConfig:
     ICEBERG_REST_CATALOG_URI = None
     ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
     ICEBERG_TABLE_RESULT_NAMESPACE = None
+    ICEBERG_TABLE_STATE_NAMESPACE = None
     ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
     ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
 
@@ -51,6 +52,7 @@ class StorageConfig:
         rest_catalog_uri,
         rest_catalog_warehouse_name,
         table_result_namespace,
+        table_state_namespace,
         directory_path,
         commit_batch_size,
         s3_endpoint,
@@ -71,6 +73,7 @@ class StorageConfig:
         cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name
 
         cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
+        cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace
         cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
         cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
 
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py 
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index de0c5db56e..883450abf2 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
     RESULT = "result"
     RUNTIME_STATISTICS = "runtimeStatistics"
     CONSOLE_MESSAGES = "consoleMessages"
+    STATE = "state"
 
 
 class VFSURIFactory:
@@ -88,12 +89,22 @@ class VFSURIFactory:
         )
 
     @staticmethod
-    def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
-        """Creates a URI pointing to a result storage."""
-        base_uri = (
+    def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
+        """Base URI for a port. Result and state URIs derive from it via
+        `result_uri` / `state_uri`.
+        """
+        return (
             f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
             f"/eid/{execution_id.id}/globalportid/"
             f"{serialize_global_port_identity(global_port_id)}"
         )
 
+    @staticmethod
+    def result_uri(base_uri: str) -> str:
+        """The result-resource URI under a port base URI."""
         return f"{base_uri}/{VFSResourceType.RESULT.value}"
+
+    @staticmethod
+    def state_uri(base_uri: str) -> str:
+        """The state-resource URI under a port base URI."""
+        return f"{base_uri}/{VFSResourceType.STATE.value}"
diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index 8687298f81..9b21fa5334 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -52,6 +52,7 @@ if __name__ == "__main__":
         iceberg_rest_catalog_uri,
         iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
+        iceberg_table_state_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
         s3_endpoint,
@@ -68,6 +69,7 @@ if __name__ == "__main__":
         iceberg_rest_catalog_uri,
         iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
+        iceberg_table_state_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
         s3_endpoint,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 095315590f..030fa3a3bb 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.messaginglayer
 
 import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple._
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
@@ -33,7 +33,7 @@ import 
org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.
 import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
 import org.apache.texera.amber.engine.architecture.worker.managers.{
-  OutputPortResultWriterThread,
+  OutputPortStorageWriterThread,
   PortStorageWriterTerminateSignal
 }
 import org.apache.texera.amber.engine.common.AmberLogging
@@ -121,7 +121,10 @@ class OutputManager(
     mutable.HashMap[(PhysicalLink, ActorVirtualIdentity), 
NetworkOutputBuffer]()
 
   private val outputPortResultWriterThreads
-      : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
+      : mutable.HashMap[PortIdentity, OutputPortStorageWriterThread] =
+    mutable.HashMap()
+
+  private val stateWriterThreads: mutable.HashMap[PortIdentity, 
OutputPortStorageWriterThread] =
     mutable.HashMap()
 
   /**
@@ -191,19 +194,20 @@ class OutputManager(
 
   def emitState(state: State): Unit = {
     networkOutputBuffers.foreach(kv => kv._2.sendState(state))
+    saveStateToStorageIfNeeded(state)
   }
 
-  def addPort(portId: PortIdentity, schema: Schema, storageURIOption: 
Option[URI]): Unit = {
+  def addPort(portId: PortIdentity, schema: Schema, storageURIBaseOption: 
Option[URI]): Unit = {
     // each port can only be added and initialized once.
     if (this.ports.contains(portId)) {
       return
     }
     this.ports(portId) = WorkerPort(schema)
 
-    // if a storage URI is provided, set up a storage writer thread
-    storageURIOption match {
-      case Some(storageUri) => setupOutputStorageWriterThread(portId, 
storageUri)
-      case None             => // No need to add a writer
+    // if a storage URI base is provided, set up storage writer threads
+    storageURIBaseOption match {
+      case Some(portBaseURI) => setupOutputStorageWriterThread(portId, 
portBaseURI)
+      case None              => // No need to add a writer
     }
   }
 
@@ -232,6 +236,15 @@ class OutputManager(
     })
   }
 
+  private def saveStateToStorageIfNeeded(state: State): Unit = {
+    // The same state row is fanned out to every output port's state
+    // table. This mirrors the broadcast-to-all-workers behavior on the
+    // emit side: state is shared context, not per-key data, so every
+    // downstream operator (and every worker reading the materialization)
+    // needs the full set.
+    stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
+  }
+
   /**
     * Singal the port storage writer to flush the remaining buffer and wait 
for commits to finish so that
     * the output port is properly completed. If the output port does not need 
storage, no action will be done.
@@ -251,7 +264,11 @@ class OutputManager(
         writerThread.getFailure.foreach(throw _)
       case None =>
     }
-
+    this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>
+      writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
+      writerThread.join()
+      writerThread.getFailure.foreach(throw _)
+    }
   }
 
   def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -285,9 +302,9 @@ class OutputManager(
     ports.head._1
   }
 
-  private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
+  private def setupOutputStorageWriterThread(portId: PortIdentity, 
portBaseURI: URI): Unit = {
     val bufferedItemWriter = DocumentFactory
-      .openDocument(storageUri)
+      .openDocument(VFSURIFactory.resultURI(portBaseURI))
       ._1
       .writer(
         VirtualIdentityUtils
@@ -300,9 +317,20 @@ class OutputManager(
           .toString
       )
       .asInstanceOf[BufferedItemWriter[Tuple]]
-    val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
+    val writerThread = new OutputPortStorageWriterThread(bufferedItemWriter)
     this.outputPortResultWriterThreads(portId) = writerThread
     writerThread.start()
+
+    // The state document is provisioned alongside the result document
+    // by RegionExecutionCoordinator, so it is always present.
+    val stateWriter = DocumentFactory
+      .openDocument(VFSURIFactory.stateURI(portBaseURI))
+      ._1
+      .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+      .asInstanceOf[BufferedItemWriter[Tuple]]
+    val stateWriterThread = new OutputPortStorageWriterThread(stateWriter)
+    this.stateWriterThreads(portId) = stateWriterThread
+    stateWriterThread.start()
   }
 
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index 4ff5ff15ae..3358e31e65 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -187,6 +187,7 @@ class PythonWorkflowWorker(
         if (isRest) StorageConfig.icebergRESTCatalogUri else "",
         if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
         StorageConfig.icebergTableResultNamespace,
+        StorageConfig.icebergTableStateNamespace,
         StorageConfig.fileStorageDirectoryPath.toString,
         StorageConfig.icebergTableCommitBatchSize.toString,
         StorageConfig.s3Endpoint,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 401ccddc0a..43e8d281ce 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
PhysicalOpIdentity}
 import org.apache.texera.amber.core.workflow._
 import 
org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
@@ -174,12 +174,12 @@ class CostBasedScheduleGenerator(
         // Allocate an URI for each of these output ports
         val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
           outputPortIdsNeedingStorage.map { gpid =>
-            val outputWriterURI = createResultURI(
+            val portBaseURI = createPortBaseURI(
               workflowId = workflowContext.workflowId,
               executionId = workflowContext.executionId,
               globalPortId = gpid
             )
-            gpid -> OutputPortConfig(outputWriterURI)
+            gpid -> OutputPortConfig(portBaseURI)
           }.toMap
 
         val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
@@ -237,7 +237,7 @@ class CostBasedScheduleGenerator(
                     s"the outout port $globalOutputPortId has not been 
assigned a URI yet."
                 )
               )
-              .storageURI
+              .storageURIBase
 
             // Group all available URIs of this input port together
             acc.updated(
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 4bb8933896..304e1496f8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.core.workflow.{
   GlobalPortIdentity,
@@ -331,7 +331,7 @@ class ExpansionGreedyScheduleGenerator(
 
   private def getStorageURIFromGlobalOutputPortId(outputPortId: 
GlobalPortIdentity) = {
     assert(!outputPortId.input)
-    createResultURI(
+    createPortBaseURI(
       workflowId = workflowContext.workflowId,
       executionId = workflowContext.executionId,
       globalPortId = outputPortId
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 2971e4c4f4..5a9df11b58 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,8 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
 import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalLink, PhysicalOp}
@@ -465,7 +466,7 @@ class RegionExecutionCoordinator(
                               opId = physicalOp.id,
                               portId = outputPortId
                             ) =>
-                          cfg.storageURI.toString
+                          cfg.storageURIBase.toString
                       }
                       .getOrElse("")
                     Some(
@@ -568,18 +569,21 @@ class RegionExecutionCoordinator(
   ): Unit = {
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
-        val storageUriToAdd = portConfig.storageURI
-        val (_, eid, _, _) = decodeURI(storageUriToAdd)
+        val portBaseURI = portConfig.storageURIBase
+        val resultURI = VFSURIFactory.resultURI(portBaseURI)
+        val stateURI = VFSURIFactory.stateURI(portBaseURI)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(resultURI, schema)
+        DocumentFactory.createDocument(stateURI, State.schema)
         if (!isRestart) {
+          val (_, eid, _, _) = decodeURI(resultURI)
           WorkflowExecutionsResource.insertOperatorPortResultUri(
             eid = eid,
             globalPortId = outputPortId,
-            uri = storageUriToAdd
+            uri = resultURI
           )
         }
     }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
index b4a1e058b4..56743ae095 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
@@ -31,9 +31,13 @@ sealed trait PortConfig {
   def storageURIs: List[URI]
 }
 
-/** An output port requires exactly one materialization URI. */
-final case class OutputPortConfig(storageURI: URI) extends PortConfig {
-  override val storageURIs: List[URI] = List(storageURI)
+/**
+  * An output port requires exactly one materialization base URI. Result and
+  * state URIs hang off it via `VFSURIFactory.resultURI` / `stateURI`; this
+  * field is *not* a URI you can pass straight to `DocumentFactory`.
+  */
+final case class OutputPortConfig(storageURIBase: URI) extends PortConfig {
+  override val storageURIs: List[URI] = List(storageURIBase)
 }
 
 /**
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..428d9fb48c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,7 +21,8 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.{
@@ -45,7 +46,11 @@ import 
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
   DPInputQueueElement,
   FIFOMessageElement
 }
-import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.ambermessage.{
+  DataFrame,
+  StateFrame,
+  WorkflowFIFOMessage
+}
 import 
org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
@@ -78,14 +83,35 @@ class InputPortMaterializationReaderThread(
   def finished: Boolean = isFinished.get()
 
   /**
-    * Read from the materialization stoage, and mimcs the behavior of an 
upstream worker's output manager.
+    * Read from the materialization storage, and mimics the behavior of an 
upstream worker's output manager.
+    *
+    * States and tuples are persisted to separate tables, so the original
+    * interleaving is lost and replay has to pick an order: we replay states
+    * first because downstream operators typically need their state set up
+    * before they process the incoming tuples. Every state is broadcast to
+    * every downstream worker -- no partitioner filtering, unlike the tuple
+    * loop. State is shared context (e.g. config / counters), not per-key
+    * data, so each worker needs the full set.
     */
   override def run(): Unit = {
     // Notify the input port of start of input channel
     emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
     try {
+      val stateDocument =
+        DocumentFactory
+          .openDocument(VFSURIFactory.stateURI(uri))
+          ._1
+          .asInstanceOf[VirtualDocument[Tuple]]
+      val stateReadIterator = stateDocument.get()
+      while (stateReadIterator.hasNext) {
+        val state = State.fromTuple(stateReadIterator.next())
+        inputMessageQueue.put(
+          FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber, 
StateFrame(state)))
+        )
+      }
+
       val materialization: VirtualDocument[Tuple] = DocumentFactory
-        .openDocument(uri)
+        .openDocument(VFSURIFactory.resultURI(uri))
         ._1
         .asInstanceOf[VirtualDocument[Tuple]]
       val storageReadIterator = materialization.get()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
similarity index 98%
rename from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
index 4223d920da..bcabde0089 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 sealed trait TerminateSignal
 case object PortStorageWriterTerminateSignal extends TerminateSignal
 
-class OutputPortResultWriterThread(
+class OutputPortStorageWriterThread(
     bufferedItemWriter: BufferedItemWriter[Tuple]
 ) extends Thread {
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
index fe959733ab..9e2c7f7c2b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
@@ -58,11 +58,11 @@ trait AssignPortHandler {
         dp.stateManager.assertState(READY, RUNNING, PAUSED)
       }
     } else {
-      val storageURIOption: Option[URI] = msg.storageUris.head match {
+      val storageURIBaseOption: Option[URI] = msg.storageUris.head match {
         case ""        => None
         case uriString => Some(URI.create(uriString))
       }
-      dp.outputManager.addPort(msg.portId, schema, storageURIOption)
+      dp.outputManager.addPort(msg.portId, schema, storageURIBaseOption)
     }
     EmptyReturn()
   }
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py 
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
new file mode 100644
index 0000000000..dcf7ccde67
--- /dev/null
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.packaging.output_manager import OutputManager
+from core.models.state import State
+from core.storage.runnables.port_storage_writer import PortStorageWriterElement
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+def _stub_state_writer(output_manager, port_id):
+    """Inject a (queue, writer, thread) triple as if a port were set up."""
+    queue = MagicMock()
+    writer = MagicMock()
+    thread = MagicMock()
+    output_manager._port_state_writers[port_id] = (queue, writer, thread)
+    return queue, writer, thread
+
+
+class TestSaveStateToStorageIfNeeded:
+    @pytest.fixture
+    def output_manager(self):
+        return OutputManager(worker_id="Worker:WF0-test-main-0")
+
+    @pytest.fixture
+    def port_a(self):
+        return PortIdentity(id=0, internal=False)
+
+    @pytest.fixture
+    def port_b(self):
+        return PortIdentity(id=1, internal=False)
+
+    @pytest.fixture
+    def state(self):
+        return State({"loop_counter": 1, "i": 2})
+
+    def test_no_state_writers_is_a_noop(self, output_manager, state):
+        # With no port set up, save_state_to_storage_if_needed must not
+        # touch any writer.
+        output_manager.save_state_to_storage_if_needed(state)  # no-op
+
+    def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
+        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+        # No assertion needed -- the absence of any writer means nothing
+        # was attempted.
+
+    def test_enqueues_to_every_port_when_port_id_omitted(
+        self, output_manager, state, port_a, port_b
+    ):
+        queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+        queue_b, _, _ = _stub_state_writer(output_manager, port_b)
+
+        output_manager.save_state_to_storage_if_needed(state)
+
+        # Each port's writer queue receives one PortStorageWriterElement.
+        # Critically, save is non-blocking -- the call must not invoke
+        # put_one / close on the buffered writer directly (those happen
+        # off-thread).
+        assert queue_a.put.call_count == 1
+        assert queue_b.put.call_count == 1
+        assert isinstance(queue_a.put.call_args.args[0], 
PortStorageWriterElement)
+        assert isinstance(queue_b.put.call_args.args[0], 
PortStorageWriterElement)
+
+    def test_enqueues_only_to_selected_port_when_port_id_specified(
+        self, output_manager, state, port_a, port_b
+    ):
+        queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+        queue_b, _, _ = _stub_state_writer(output_manager, port_b)
+
+        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+
+        assert queue_a.put.call_count == 1
+        queue_b.put.assert_not_called()
+
+    def test_close_port_storage_writers_stops_state_threads(
+        self, output_manager, port_a, port_b
+    ):
+        # After the port completes, every state-writer thread must be
+        # stopped and joined so the buffered writer's close() (which
+        # flushes the final Iceberg commit) actually runs.
+        _, writer_a, thread_a = _stub_state_writer(output_manager, port_a)
+        _, writer_b, thread_b = _stub_state_writer(output_manager, port_b)
+
+        output_manager.close_port_storage_writers()
+
+        writer_a.stop.assert_called_once()
+        writer_b.stop.assert_called_once()
+        thread_a.join.assert_called_once()
+        thread_b.join.assert_called_once()
+        assert output_manager._port_state_writers == {}
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
new file mode 100644
index 0000000000..9d1fd30698
--- /dev/null
+++ 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -0,0 +1,258 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+End-to-end integration test for cross-region state materialization.
+
+The unit tests for `OutputManager.save_state_to_storage_if_needed` and
+`InputPortMaterializationReaderRunnable.run` mock out the iceberg layer,
+so a regression in the writer/storage/reader join is invisible to them.
+This test wires:
+
+    OutputManager.set_up_port_storage_writer(port, base_uri)
+       → real PortStorageWriter thread
+       → real IcebergTableWriter (sqlite-backed SqlCatalog)
+       → state document at VFSURIFactory.state_uri(base_uri)
+       → InputPortMaterializationReaderRunnable.run()
+       → DataElement(StateFrame) on the consumer's input queue
+
+and asserts that a state put through `save_state_to_storage_if_needed`
+on the producer side actually arrives at the consumer's queue, with the
+same payload.
+"""
+
+import tempfile
+import threading
+import uuid
+
+import pytest
+from pyiceberg.catalog.sql import SqlCatalog
+
+from core.architecture.packaging.output_manager import OutputManager
+from core.models import State, StateFrame
+from core.models.internal_queue import DataElement, InternalQueue
+from core.storage.document_factory import DocumentFactory
+from core.storage.iceberg.iceberg_catalog_instance import 
IcebergCatalogInstance
+from core.storage.runnables.input_port_materialization_reader_runnable import (
+    InputPortMaterializationReaderRunnable,
+)
+from core.storage.storage_config import StorageConfig
+from core.storage.vfs_uri_factory import VFSURIFactory
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+    ExecutionIdentity,
+    GlobalPortIdentity,
+    OperatorIdentity,
+    PhysicalOpIdentity,
+    PortIdentity,
+    WorkflowIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.sendsemantics import (
+    OneToOnePartitioning,
+    Partitioning,
+)
+
+
+# Module-level scratch dir for the sqlite catalog + iceberg warehouse.
+# We don't initialize `StorageConfig` here: other test modules (e.g.
+# test_iceberg_document.py) also call `StorageConfig.initialize` at
+# import time, and the class rejects re-initialization with
+# RuntimeError. Whichever module gets collected first wins; we adopt
+# its namespaces below.
+_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-")
+
+
[email protected](scope="module", autouse=True)
+def sqlite_iceberg_catalog():
+    """Inject a sqlite-backed SqlCatalog so the test runs without external
+    iceberg infra (postgres/minio).
+
+    Module-scoped so all tests in this file share one warehouse, and so
+    namespace creation only happens once. We save/restore the original
+    `IcebergCatalogInstance` singleton so other test modules that expect
+    a real postgres-backed catalog (e.g. test_iceberg_document.py) are
+    not affected by our replacement.
+    """
+    # Some other test module may have initialized StorageConfig already
+    # (it has a single-init lock). If nothing has initialized it yet,
+    # do it here with arbitrary values -- we replace the catalog
+    # instance below so the postgres/rest fields are never exercised.
+    if not StorageConfig._initialized:
+        StorageConfig.initialize(
+            catalog_type="postgres",
+            postgres_uri_without_scheme="unused",
+            postgres_username="unused",
+            postgres_password="unused",
+            rest_catalog_uri="unused",
+            rest_catalog_warehouse_name="unused",
+            table_result_namespace="operator-port-result",
+            table_state_namespace="operator-port-state",
+            directory_path=_WAREHOUSE_DIR,
+            commit_batch_size=4096,
+            s3_endpoint="unused",
+            s3_region="unused",
+            s3_auth_username="unused",
+            s3_auth_password="unused",
+        )
+
+    original_instance = IcebergCatalogInstance._instance
+    db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite"
+    catalog = SqlCatalog(
+        "texera_iceberg_e2e",
+        **{
+            "uri": f"sqlite:///{db_path}",
+            "warehouse": f"file://{_WAREHOUSE_DIR}",
+        },
+    )
+    # Adopt whatever namespaces StorageConfig already has -- those are
+    # the ones DocumentFactory will route into.
+    
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE)
+    
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE)
+    IcebergCatalogInstance.replace_instance(catalog)
+    try:
+        yield catalog
+    finally:
+        IcebergCatalogInstance.replace_instance(original_instance)
+
+
+def _fresh_base_uri() -> str:
+    """A unique port-base URI per test so tables don't collide."""
+    return VFSURIFactory.create_port_base_uri(
+        WorkflowIdentity(id=0),
+        ExecutionIdentity(id=0),
+        GlobalPortIdentity(
+            op_id=PhysicalOpIdentity(
+                logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"),
+                layer_name="main",
+            ),
+            port_id=PortIdentity(id=0, internal=False),
+            input=False,
+        ),
+    )
+
+
+def test_state_written_by_output_manager_is_replayed_by_reader():
+    """Producer side writes a state via OutputManager; consumer side reads
+    it via InputPortMaterializationReaderRunnable. The state must arrive
+    on the consumer's input queue intact.
+    """
+    base_uri = _fresh_base_uri()
+    port_id = PortIdentity(id=0, internal=False)
+    worker_schema_for_result = State.SCHEMA  # producer-side: only state 
matters
+
+    # 1. RegionExecutionCoordinator's responsibility: provision result +
+    # state documents at the port base URI before any worker starts.
+    # We emulate that here.
+    DocumentFactory.create_document(
+        VFSURIFactory.result_uri(base_uri), worker_schema_for_result
+    )
+    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
+
+    # 2. Producer side: spin up an OutputManager, set up real state +
+    # result writer threads against the iceberg storage.
+    producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0")
+    producer.add_output_port(
+        port_id, schema=worker_schema_for_result, storage_uri_base=base_uri
+    )
+
+    # 3. Drive a state through the producer-side path.
+    state = State({"flag": True, "loop_counter": 7, "name": "outer"})
+    producer.save_state_to_storage_if_needed(state)
+
+    # 4. Force the writer threads to flush + commit by closing them.
+    # Without this, the iceberg buffer holds the state in memory and
+    # nothing is durable yet.
+    producer.close_port_storage_writers()
+
+    # 5. Consumer side: spin up the materialization reader against the
+    # same base URI. Each reader needs a partitioning even when no real
+    # downstream worker exists -- supply a OneToOnePartitioning whose
+    # only receiver is the consumer worker itself.
+    consumer_worker = ActorVirtualIdentity(name="consumer-worker-0")
+    consumer_queue = InternalQueue()
+    partitioning = Partitioning(
+        one_to_one_partitioning=OneToOnePartitioning(
+            batch_size=400,
+            channels=[
+                ChannelIdentity(
+                    
from_worker_id=ActorVirtualIdentity(name="producer-worker-0"),
+                    to_worker_id=consumer_worker,
+                    is_control=False,
+                )
+            ],
+        )
+    )
+    reader = InputPortMaterializationReaderRunnable(
+        uri=base_uri,
+        queue=consumer_queue,
+        worker_actor_id=consumer_worker,
+        partitioning=partitioning,
+    )
+
+    # Run the reader on a worker thread so we can time out cleanly if
+    # something goes wrong.
+    reader_thread = threading.Thread(target=reader.run, daemon=True)
+    reader_thread.start()
+    reader_thread.join(timeout=30)
+    assert not reader_thread.is_alive(), "reader did not finish within timeout"
+    assert reader.finished(), "reader exited but did not mark itself finished"
+
+    # 6. Drain the consumer's queue and find the StateFrame(s).
+    state_frames: list[State] = []
+    while not consumer_queue.is_empty():
+        elem = consumer_queue.get()
+        if isinstance(elem, DataElement) and isinstance(elem.payload, 
StateFrame):
+            state_frames.append(elem.payload.frame)
+
+    assert len(state_frames) == 1, (
+        f"expected exactly one State to flow through writer→iceberg→reader; "
+        f"got {len(state_frames)}: {state_frames}"
+    )
+    assert state_frames[0] == state, (
+        f"replayed state did not match what was written; "
+        f"wrote={state}, read={state_frames[0]}"
+    )
+
+
+def test_state_table_persists_across_writer_close():
+    """Independently verify the iceberg state table contains the row.
+    If this passes but the reader test above fails, the bug is in the
+    reader / consumer wiring; if this fails, the bug is in the writer /
+    storage layer.
+    """
+    base_uri = _fresh_base_uri()
+    port_id = PortIdentity(id=0, internal=False)
+
+    DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), 
State.SCHEMA)
+    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
+
+    producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0")
+    producer.add_output_port(port_id, schema=State.SCHEMA, 
storage_uri_base=base_uri)
+
+    state = State({"flag": False, "checkpoint": 42})
+    producer.save_state_to_storage_if_needed(state)
+    producer.close_port_storage_writers()
+
+    # Read directly from the iceberg state document, bypassing the reader.
+    state_document, _ = 
DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri))
+    rows = list(state_document.get())
+    assert len(rows) == 1, (
+        f"expected exactly one row in the iceberg state table after the "
+        f"writer was closed; got {len(rows)} rows"
+    )
+    assert State.from_tuple(rows[0]) == state
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py 
b/amber/src/test/python/core/runnables/test_main_loop.py
index 400a7f2a90..c32f45b888 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -1393,6 +1393,163 @@ class TestMainLoop:
         assert second_output.payload.frame["value"] == 42
         assert second_output.payload.frame["port"] == 0
 
+    @pytest.mark.timeout(2)
+    def test_process_input_state_persists_output_state_to_storage(
+        self,
+        main_loop,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # process_input_state must invoke save_state_to_storage_if_needed
+        # with the freshly emitted output state, so every state that flows
+        # downstream is also durable on the upstream output port.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                return State({"value": state["value"] + 1, "port": port})
+
+        saved_states: list[State] = []
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state: saved_states.append(state),
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        main_loop._process_state(State({"value": 1}))
+        main_loop._process_state(State({"value": 41}))
+
+        # Each input state produced one output state, so both must have
+        # been persisted in order.
+        assert [s["value"] for s in saved_states] == [2, 42]
+        assert all(s["port"] == 0 for s in saved_states)
+
+    @pytest.mark.timeout(2)
+    def test_process_start_channel_persists_produce_state_on_start_output(
+        self,
+        main_loop,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # The state emitted by an executor's `produce_state_on_start` must
+        # also be persisted via `save_state_to_storage_if_needed`, so a
+        # downstream worker in a different region can replay it from the
+        # iceberg state table.
+        #
+        # This is the integration path exercised in real workflows when
+        # users override `produce_state_on_start`. `_process_start_channel`
+        # → `process_input_state` → DataProcessor.process_internal_marker
+        # (StartChannel) → executor.produce_state_on_start → _set_output_state
+        # → MainLoop reads output state → emit + save.
+        on_start_state = State({"flag": True, "loop_counter": 0})
+
+        class DummyExecutor:
+            @staticmethod
+            def produce_state_on_start(port: int) -> State:
+                # Tag with port so we can also assert the right port id
+                # was forwarded.
+                return State({**on_start_state, "port": port})
+
+        saved_states: list[State] = []
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state: saved_states.append(state),
+        )
+        # _send_ecm_to_data_channels touches output_manager state we don't
+        # set up here; for this test the ECM forwarding is irrelevant -- the
+        # SAVE path is what we're pinning. Stub it.
+        monkeypatch.setattr(main_loop, "_send_ecm_to_data_channels", lambda 
*_: None)
+
+        # Simulate the DP-thread side: when MainLoop yields, the DataProcessor
+        # consumes the StartChannel marker and runs produce_state_on_start.
+        def fake_switch_context():
+            from core.models.internal_marker import StartChannel as 
_StartChannel
+
+            tpm = main_loop.context.tuple_processing_manager
+            if isinstance(tpm.current_internal_marker, _StartChannel):
+                # mimic DataProcessor.process_internal_marker(StartChannel)
+                produced = DummyExecutor.produce_state_on_start(port=0)
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    produced
+                )
+                tpm.current_internal_marker = None  # consumed
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        # Drive the path: this is exactly what `_process_ecm` calls when a
+        # StartChannel ECM arrives and the start_channel handler has set
+        # the marker.
+        from core.models.internal_marker import StartChannel
+
+        main_loop.context.tuple_processing_manager.current_internal_marker = (
+            StartChannel()
+        )
+        main_loop._process_start_channel()
+
+        # The state produced by produce_state_on_start must be persisted to
+        # iceberg via save_state_to_storage_if_needed. Without this, a
+        # downstream worker in a different region cannot observe the state.
+        assert len(saved_states) == 1, (
+            f"produce_state_on_start emitted a state but it was not persisted "
+            f"to storage. saved_states={saved_states}"
+        )
+        assert saved_states[0]["flag"] is True
+        assert saved_states[0]["loop_counter"] == 0
+        assert saved_states[0]["port"] == 0
+
+    @pytest.mark.timeout(2)
+    def test_process_input_state_does_not_save_when_no_output(
+        self,
+        main_loop,
+        monkeypatch,
+    ):
+        # When the executor returns no output state (process_state returned
+        # None), save_state_to_storage_if_needed must not be called -- no
+        # state means nothing to materialize.
+        save_calls: list[State] = []
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state: save_calls.append(state),
+        )
+        # Pretend DataProc consumed the input but produced no output.
+        monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
+
+        main_loop._process_state(State({"value": 1}))
+
+        assert save_calls == []
+
     @pytest.mark.timeout(2)
     def test_main_loop_thread_can_process_state(
         self,
diff --git 
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index 327b907306..bdba00ba0c 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -24,6 +24,7 @@ from concurrent.futures import as_completed
 from concurrent.futures.thread import ThreadPoolExecutor
 
 from core.models import Schema, Tuple
+from core.models.state import State
 from core.storage.document_factory import DocumentFactory
 from core.storage.storage_config import StorageConfig
 from core.storage.vfs_uri_factory import VFSURIFactory
@@ -49,6 +50,7 @@ StorageConfig.initialize(
     rest_catalog_uri="http://localhost:8181/catalog/";,
     rest_catalog_warehouse_name="texera",
     table_result_namespace="operator-port-result",
+    table_state_namespace="operator-port-state",
     directory_path=tempfile.mkdtemp(prefix="texera-iceberg-warehouse-"),
     commit_batch_size=4096,
     s3_endpoint="http://localhost:9000";,
@@ -81,17 +83,21 @@ class TestIcebergDocument:
         with a random operator id
         """
         operator_uuid = str(uuid.uuid4()).replace("-", "")
-        uri = VFSURIFactory.create_result_uri(
-            WorkflowIdentity(id=0),
-            ExecutionIdentity(id=0),
-            GlobalPortIdentity(
-                op_id=PhysicalOpIdentity(
-                    
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
-                    layer_name="main",
+        uri = VFSURIFactory.result_uri(
+            VFSURIFactory.create_port_base_uri(
+                WorkflowIdentity(id=0),
+                ExecutionIdentity(id=0),
+                GlobalPortIdentity(
+                    op_id=PhysicalOpIdentity(
+                        logical_op_id=OperatorIdentity(
+                            id=f"test-table-{operator_uuid}"
+                        ),
+                        layer_name="main",
+                    ),
+                    port_id=PortIdentity(id=0),
+                    input=False,
                 ),
-                port_id=PortIdentity(id=0),
-                input=False,
-            ),
+            )
         )
         DocumentFactory.create_document(uri, amber_schema)
         document, _ = DocumentFactory.open_document(uri)
@@ -322,3 +328,85 @@ class TestIcebergDocument:
         assert iceberg_document.get_count() == len(sample_items), (
             "get_count should return the same number as the length of 
sample_items"
         )
+
+    def test_state_materialization_round_trip(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        base_uri = VFSURIFactory.create_port_base_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    
logical_op_id=OperatorIdentity(id=f"test-state-{operator_uuid}"),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = VFSURIFactory.state_uri(base_uri)
+        DocumentFactory.create_document(state_uri, State.SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        state = State(
+            {
+                "loop_counter": 3,
+                "name": "outer-loop",
+                "payload": b"\x00\x01state-bytes",
+                "nested": {"enabled": True, "values": [1, 2, 3]},
+            }
+        )
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        writer.put_one(state.to_tuple())
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == 1
+        assert State.from_tuple(stored_rows[0]) == state
+
+    def test_multiple_states_materialize_as_rows_in_one_table(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        base_uri = VFSURIFactory.create_port_base_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    logical_op_id=OperatorIdentity(
+                        id=f"test-multiple-states-{operator_uuid}"
+                    ),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = VFSURIFactory.state_uri(base_uri)
+        DocumentFactory.create_document(state_uri, State.SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        states = [
+            State({"loop_counter": 0, "i": 1, "payload": b"first"}),
+            State(
+                {
+                    "loop_counter": 1,
+                    "i": 2,
+                    "payload": b"second",
+                    "nested": {"values": [3, 4]},
+                }
+            ),
+        ]
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        for state in states:
+            writer.put_one(state.to_tuple())
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == len(states)
+        actual_states = sorted(
+            [State.from_tuple(row) for row in stored_rows],
+            key=lambda state: state["loop_counter"],
+        )
+        assert actual_states == states
diff --git 
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
new file mode 100644
index 0000000000..5016c2df2f
--- /dev/null
+++ 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import State, StateFrame
+from core.models.internal_queue import DataElement
+from core.models.schema import Schema
+from core.storage.runnables.input_port_materialization_reader_runnable import (
+    InputPortMaterializationReaderRunnable,
+)
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+)
+
+
+class TestRunStateReadingBlock:
+    """Cover the state-reading block in run() that opens the state
+    document and emits its rows as StateFrames directly to the input
+    queue (no partitioner filtering -- state is broadcast to every
+    worker).
+    """
+
+    @pytest.fixture
+    def me(self):
+        return ActorVirtualIdentity(name="me")
+
+    @pytest.fixture
+    def runnable(self, me):
+        instance = InputPortMaterializationReaderRunnable.__new__(
+            InputPortMaterializationReaderRunnable
+        )
+        instance.uri = "vfs:///wf/0/exec/0/result/op-a"
+        instance.worker_actor_id = me
+        instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"})
+        instance._stopped = False
+        instance._finished = False
+        instance.channel_id = ChannelIdentity(me, me, is_control=False)
+        instance.queue = MagicMock()
+        instance.partitioner = MagicMock()
+        # No tuple-batches and no ECM-flush payloads in these tests.
+        instance.partitioner.flush.return_value = []
+        return instance
+
+    def test_state_rows_are_emitted_as_state_frames(self, runnable):
+        state_a = State({"loop_counter": 0})
+        state_b = State({"loop_counter": 1})
+
+        # The state document yields opaque tuples; from_tuple deserializes
+        # them. Patch from_tuple so we don't have to wire a real
+        # serialization.
+        result_doc = MagicMock()
+        result_doc.get.return_value = iter([])  # No materialized tuples.
+        state_doc = MagicMock()
+        state_doc.get.return_value = iter(["row-a", "row-b"])
+
+        with (
+            patch(
+                
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
+            ) as mock_factory,
+            patch.object(State, "from_tuple") as mock_from_tuple,
+        ):
+            mock_factory.open_document.side_effect = [
+                (result_doc, runnable.tuple_schema),
+                (state_doc, None),
+            ]
+            mock_from_tuple.side_effect = [state_a, state_b]
+
+            runnable.run()
+
+        # Two StateFrames must have been put on the queue, in order.
+        # The state replay must NOT route through the partitioner --
+        # state is shared context, broadcast to every worker.
+        runnable.partitioner.flush_state.assert_not_called()
+        state_frames = [
+            call.args[0]
+            for call in runnable.queue.put.call_args_list
+            if isinstance(call.args[0], DataElement)
+            and isinstance(call.args[0].payload, StateFrame)
+        ]
+        assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
+        assert runnable._finished is True
diff --git a/amber/src/test/python/core/storage/test_document_factory.py 
b/amber/src/test/python/core/storage/test_document_factory.py
new file mode 100644
index 0000000000..859c004024
--- /dev/null
+++ b/amber/src/test/python/core/storage/test_document_factory.py
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import Schema
+from core.storage.document_factory import DocumentFactory
+from core.storage.storage_config import StorageConfig
+from core.storage.vfs_uri_factory import VFSResourceType
+
+
+# Avoid initializing the real config (only initializable once per process).
+StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE = "test-result-ns"
+StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE = "test-state-ns"
+
+VFS_URI = "vfs:///wid/0/eid/0/opid/test/main/0/0/result"
+
+
[email protected]
+def schema():
+    return Schema(raw_schema={"x": "INTEGER"})
+
+
+def _decode_returning(resource_type):
+    """Helper: build a VFSURIFactory.decode_uri side_effect."""
+    return lambda _uri: (None, None, None, resource_type)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.amber_schema_to_iceberg_schema")
+@patch("core.storage.document_factory.create_table")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestCreateDocumentNamespaceRouting:
+    def test_state_resource_type_uses_state_namespace(
+        self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+
+        DocumentFactory.create_document(VFS_URI, schema)
+
+        args, _ = mock_create_table.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+    def test_result_resource_type_uses_result_namespace(
+        self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.RESULT)
+
+        DocumentFactory.create_document(VFS_URI, schema)
+
+        args, _ = mock_create_table.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+
+    def test_unsupported_resource_type_raises_value_error(
+        self, mock_vfs, _icb, _create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        # CONSOLE_MESSAGES has no namespace mapping in the Python factory.
+        mock_vfs.decode_uri.side_effect = _decode_returning(
+            VFSResourceType.CONSOLE_MESSAGES
+        )
+
+        with pytest.raises(ValueError, match="not supported"):
+            DocumentFactory.create_document(VFS_URI, schema)
+
+
+def test_create_document_rejects_non_vfs_scheme(schema):
+    with pytest.raises(NotImplementedError, match="Unsupported URI scheme"):
+        DocumentFactory.create_document("file:///tmp/x", schema)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.Schema")
+@patch("core.storage.document_factory.load_table_metadata")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestOpenDocumentNamespaceRouting:
+    @staticmethod
+    def _stub_table():
+        table = MagicMock()
+        table.schema.return_value.as_arrow.return_value = MagicMock()
+        return table
+
+    def test_state_resource_type_uses_state_namespace(
+        self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+        mock_load.return_value = self._stub_table()
+
+        DocumentFactory.open_document(VFS_URI)
+
+        args, _ = mock_load.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+    def test_unsupported_resource_type_raises_value_error(
+        self, mock_vfs, _icb, _load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = _decode_returning(
+            VFSResourceType.CONSOLE_MESSAGES
+        )
+
+        with pytest.raises(ValueError, match="not supported"):
+            DocumentFactory.open_document(VFS_URI)
+
+    def test_missing_table_raises_value_error(
+        self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+        mock_load.return_value = None
+
+        with pytest.raises(ValueError, match="No storage is found"):
+            DocumentFactory.open_document(VFS_URI)
diff --git 
a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py 
b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
index 64c7080e52..1942e91f8b 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
@@ -34,6 +34,7 @@ class TestLargeBinaryManager:
                 rest_catalog_uri="http://localhost:8181/catalog/";,
                 rest_catalog_warehouse_name="texera",
                 table_result_namespace="test",
+                table_state_namespace="test-state",
                 directory_path="/tmp/test",
                 commit_batch_size=1000,
                 s3_endpoint="http://localhost:9000";,
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
similarity index 90%
rename from 
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
rename to 
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
index 31d8c41611..d7ab0c1831 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
@@ -32,7 +32,7 @@ import org.scalatest.flatspec.AnyFlatSpec
 
 import scala.collection.mutable
 
-class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
+class OutputPortStorageWriterThreadSpec extends AnyFlatSpec {
 
   private class StubWriter(
       onPutOne: () => Unit = () => (),
@@ -51,9 +51,9 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
 
   private def throwing(msg: String): () => Unit = () => throw new 
RuntimeException(msg)
 
-  "OutputPortResultWriterThread" should "leave getFailure empty on a clean 
run" in {
+  "OutputPortStorageWriterThread" should "leave getFailure empty on a clean 
run" in {
     val writer = new StubWriter()
-    val thread = new OutputPortResultWriterThread(writer)
+    val thread = new OutputPortStorageWriterThread(writer)
     thread.start()
     thread.queue.put(Right(PortStorageWriterTerminateSignal))
     thread.join()
@@ -63,7 +63,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
 
   it should "capture a close() exception in getFailure so the worker can 
re-throw" in {
     val writer = new StubWriter(onClose = throwing("test close failure"))
-    val thread = new OutputPortResultWriterThread(writer)
+    val thread = new OutputPortStorageWriterThread(writer)
     thread.start()
     thread.queue.put(Right(PortStorageWriterTerminateSignal))
     thread.join()
@@ -73,7 +73,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
 
   it should "capture a putOne exception and still call close()" in {
     val writer = new StubWriter(onPutOne = throwing("test putOne failure"))
-    val thread = new OutputPortResultWriterThread(writer)
+    val thread = new OutputPortStorageWriterThread(writer)
     thread.start()
     thread.queue.put(Left(null.asInstanceOf[Tuple]))
     thread.queue.put(Right(PortStorageWriterTerminateSignal))
@@ -89,7 +89,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
       onPutOne = throwing("test putOne failure"),
       onClose = throwing("test close failure")
     )
-    val thread = new OutputPortResultWriterThread(writer)
+    val thread = new OutputPortStorageWriterThread(writer)
     thread.start()
     thread.queue.put(Left(null.asInstanceOf[Tuple]))
     thread.queue.put(Right(PortStorageWriterTerminateSignal))
@@ -110,14 +110,14 @@ class OutputPortResultWriterThreadSpec extends 
AnyFlatSpec {
   private def installWriterThread(
       manager: OutputManager,
       portId: PortIdentity,
-      thread: OutputPortResultWriterThread
+      thread: OutputPortStorageWriterThread
   ): Unit = {
     val field = classOf[OutputManager]
       .getDeclaredField("outputPortResultWriterThreads")
     field.setAccessible(true)
     field
       .get(manager)
-      .asInstanceOf[mutable.HashMap[PortIdentity, 
OutputPortResultWriterThread]]
+      .asInstanceOf[mutable.HashMap[PortIdentity, 
OutputPortStorageWriterThread]]
       .put(portId, thread)
   }
 
@@ -130,7 +130,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
     )
     val portId = PortIdentity()
     val failingWriter = new StubWriter(onClose = throwing("test close 
failure"))
-    val failingThread = new OutputPortResultWriterThread(failingWriter)
+    val failingThread = new OutputPortStorageWriterThread(failingWriter)
     failingThread.start()
     installWriterThread(outputManager, portId, failingThread)
     val ex = intercept[RuntimeException] {
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 29d5f7be51..0682109f19 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -58,6 +58,9 @@ storage {
             runtime-statistics-namespace = "workflow-runtime-statistics"
             runtime-statistics-namespace = 
${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE}
 
+            state-namespace = "operator-port-state"
+            state-namespace = ${?STORAGE_ICEBERG_TABLE_STATE_NAMESPACE}
+
             commit {
                 batch-size = 4096 # decide the buffer size of our 
IcebergTableWriter
                 batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE}
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 9ec52bba65..123c56505e 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -67,6 +67,7 @@ object EnvironmentalVariable {
     "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
   val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
     "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+  val ENV_ICEBERG_TABLE_STATE_NAMESPACE = 
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
   val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = 
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
   val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = 
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
   val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 728e3c0c2d..07447cfdbe 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -54,6 +54,8 @@ object StorageConfig {
     conf.getString("storage.iceberg.table.console-messages-namespace")
   val icebergTableRuntimeStatisticsNamespace: String =
     conf.getString("storage.iceberg.table.runtime-statistics-namespace")
+  val icebergTableStateNamespace: String =
+    conf.getString("storage.iceberg.table.state-namespace")
   val icebergTableCommitBatchSize: Int =
     conf.getInt("storage.iceberg.table.commit.batch-size")
   val icebergTableCommitNumRetries: Int =
@@ -111,6 +113,7 @@ object StorageConfig {
     "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
   val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
     "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+  val ENV_ICEBERG_TABLE_STATE_NAMESPACE = 
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
   val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = 
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
   val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = 
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
   val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 15949ef471..00f6c70ba7 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => StorageConfig.icebergTableStateNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
@@ -119,6 +120,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => StorageConfig.icebergTableStateNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 0fbee64457..291c31896b 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
   val RESULT: Value = Value("result")
   val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
   val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+  val STATE: Value = Value("state")
 }
 
 object VFSURIFactory {
@@ -83,18 +84,25 @@ object VFSURIFactory {
   }
 
   /**
-    * Create a URI pointing to a result storage
+    * Create the base URI for a port. Result and state URIs are derived
+    * from this base via `resultURI` / `stateURI`.
     */
-  def createResultURI(
+  def createPortBaseURI(
       workflowId: WorkflowIdentity,
       executionId: ExecutionIdentity,
       globalPortId: GlobalPortIdentity
-  ): URI = {
-    val baseUri =
-      
s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}/globalportid/${globalPortId.serializeAsString}"
+  ): URI =
+    new URI(
+      s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}" +
+        s"/globalportid/${globalPortId.serializeAsString}"
+    )
 
-    new URI(s"$baseUri/${VFSResourceType.RESULT.toString.toLowerCase}")
-  }
+  def resultURI(baseURI: URI): URI = appendResource(baseURI, 
VFSResourceType.RESULT)
+
+  def stateURI(baseURI: URI): URI = appendResource(baseURI, 
VFSResourceType.STATE)
+
+  private def appendResource(baseURI: URI, resourceType: 
VFSResourceType.Value): URI =
+    new URI(s"$baseURI/${resourceType.toString.toLowerCase}")
 
   /**
     * Create a URI pointing to runtime statistics
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
index 6fbe35873a..0b8ae4a19c 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
@@ -42,23 +42,30 @@ class VFSURIFactorySpec extends AnyFlatSpec {
       input = true
     )
 
-  "VFSURIFactory.createResultURI" should "include workflow, execution, port, 
and the result resource type" in {
-    val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
-    assert(uri.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
-    val path = uri.getPath
+  "VFSURIFactory.createPortBaseURI" should "include workflow, execution, and 
port segments without a resource type" in {
+    val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId, 
portId)
+    assert(baseURI.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
+    val path = baseURI.getPath
     assert(path.contains("/wid/7"))
     assert(path.contains("/eid/11"))
     assert(path.contains("/globalportid/"))
-    assert(path.endsWith("/result"))
+    assert(!path.endsWith("/result"))
+    assert(!path.endsWith("/state"))
   }
 
-  it should "round-trip through decodeURI" in {
-    val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
-    val (wid, eid, globalPortIdOpt, resourceType) = 
VFSURIFactory.decodeURI(uri)
+  "VFSURIFactory.resultURI / stateURI" should "append the resource segment and 
round-trip through decodeURI" in {
+    val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId, 
portId)
+    val resultURI = VFSURIFactory.resultURI(baseURI)
+    val stateURI = VFSURIFactory.stateURI(baseURI)
+    assert(resultURI.getPath.endsWith("/result"))
+    assert(stateURI.getPath.endsWith("/state"))
+
+    val (wid, eid, globalPortIdOpt, resourceType) = 
VFSURIFactory.decodeURI(resultURI)
     assert(wid == workflowId)
     assert(eid == executionId)
     assert(globalPortIdOpt.contains(portId))
     assert(resourceType == VFSResourceType.RESULT)
+    assert(VFSURIFactory.decodeURI(stateURI)._4 == VFSResourceType.STATE)
   }
 
   "VFSURIFactory.createRuntimeStatisticsURI" should "produce a 
runtimeStatistics URI without an opid segment" in {
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index eb259fed58..b92562eeb7 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.amber.storage.result.iceberg
 
 import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.model.{VirtualDocument, 
VirtualDocumentSpec}
 import org.apache.texera.amber.core.storage.{DocumentFactory, 
IcebergCatalogInstance, VFSURIFactory}
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
@@ -51,6 +52,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
   var deserde: (IcebergSchema, Record) => Tuple = _
   var catalog: Catalog = _
   val tableNamespace = "test_namespace"
+  var baseURI: URI = _
   var uri: URI = _
 
   override def beforeAll(): Unit = {
@@ -79,7 +81,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
 
   override def beforeEach(): Unit = {
     // Generate a unique table name for each test
-    uri = VFSURIFactory.createResultURI(
+    baseURI = VFSURIFactory.createPortBaseURI(
       WorkflowIdentity(0),
       ExecutionIdentity(0),
       GlobalPortIdentity(
@@ -91,6 +93,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
         PortIdentity()
       )
     )
+    uri = VFSURIFactory.resultURI(baseURI)
     DocumentFactory.createDocument(uri, amberSchema)
     super.beforeEach()
   }
@@ -141,6 +144,84 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     }
   }
 
+  it should "round trip materialized state documents" in {
+    val stateUri = VFSURIFactory.stateURI(baseURI)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val state = State(
+      Map(
+        "loop_counter" -> 3,
+        "name" -> "outer-loop",
+        "payload" -> Array[Byte](0, 1, 2, 3),
+        "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3))
+      )
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    writer.putOne(state.toTuple)
+    writer.close()
+
+    val storedRows = stateDocument.get().toList
+    assert(storedRows.length == 1)
+    val deserialized = State.fromTuple(storedRows.head).values
+    assert(deserialized("loop_counter") == 3L)
+    assert(deserialized("name") == "outer-loop")
+    
assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0,
 1, 2, 3)))
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == 
true)
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") == 
List(1L, 2L, 3L))
+  }
+
+  it should "materialize multiple states as rows in one state table" in {
+    val stateUri = VFSURIFactory.stateURI(baseURI)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val states: List[State] = List(
+      State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 
3))),
+      State(
+        Map(
+          "loop_counter" -> 1,
+          "i" -> 2,
+          "payload" -> Array[Byte](4, 5, 6),
+          "nested" -> Map("values" -> List(3, 4))
+        )
+      )
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    states.foreach(state => writer.putOne(state.toTuple))
+    writer.close()
+
+    val deserializedStates =
+      stateDocument
+        .get()
+        .toList
+        .map(State.fromTuple)
+        .sortBy(_.values("loop_counter").asInstanceOf[Long])
+    assert(deserializedStates.length == states.length)
+    deserializedStates.zip(states).foreach {
+      case (actual, expected) =>
+        assert(
+          actual.values("loop_counter") == 
expected.values("loop_counter").asInstanceOf[Int].toLong
+        )
+        assert(actual.values("i") == 
expected.values("i").asInstanceOf[Int].toLong)
+        assert(
+          actual
+            .values("payload")
+            .asInstanceOf[Array[Byte]]
+            .sameElements(expected.values("payload").asInstanceOf[Array[Byte]])
+        )
+    }
+    assert(
+      deserializedStates(1)
+        .values("nested")
+        .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L)
+    )
+  }
+
   /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
   private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
     Proxy
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index b7cf776eb8..956e9f42b9 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -50,16 +50,18 @@ class IcebergTableStatsSpec extends AnyFlatSpec with 
BeforeAndAfterAll with Suit
   var deserde: (IcebergSchema, Record) => Tuple = _
   var catalog: Catalog = _
   val tableNamespace = "test_namespace"
-  var uri: URI = VFSURIFactory.createResultURI(
-    WorkflowIdentity(0),
-    ExecutionIdentity(0),
-    GlobalPortIdentity(
-      PhysicalOpIdentity(
-        logicalOpId =
-          
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
-        layerName = "main"
-      ),
-      PortIdentity()
+  var uri: URI = VFSURIFactory.resultURI(
+    VFSURIFactory.createPortBaseURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      GlobalPortIdentity(
+        PhysicalOpIdentity(
+          logicalOpId =
+            
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
+          layerName = "main"
+        ),
+        PortIdentity()
+      )
     )
   )
 

Reply via email to