This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 30ba48c39f refactor(amber): carry LoopStartId/LoopStartStateURI off 
State on the StateFrame envelope
30ba48c39f is described below

commit 30ba48c39f21b08a88306de7eff12c67425fbe45
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 2 23:00:04 2026 -0700

    refactor(amber): carry LoopStartId/LoopStartStateURI off State on the 
StateFrame envelope
    
    Review follow-up (r3285892274): user loop expressions exec against 
self.state, which the runtime also stuffed with loop bookkeeping -- a user 
writing e.g. `LoopStartId = ...` would silently clobber loop machinery. 
loop_counter was already moved off State; do the same for the LoopStart jump 
metadata.
    
    LoopStartId and LoopStartStateURI now ride the StateFrame envelope and are 
materialized as their own columns (parallel to content), exactly like 
loop_counter. The runtime owns them end to end: a LoopStart stamps them onto 
its output, the nested pass-through carries them, and the matching LoopEnd's 
jump reads them from the runtime (captured from the input frame) instead of 
executor.state -- so they never enter the user exec namespace. `table` (a 
user-facing input) and `output` (the run [...]
    
    Extends State to four columns and the ArrowUtils Tuple<->Arrow round-trip + 
materialization-reader tests to cover them.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
 .../core/architecture/packaging/output_manager.py  | 24 +++++-
 amber/src/main/python/core/models/payload.py       |  8 +-
 amber/src/main/python/core/models/state.py         | 35 ++++++--
 amber/src/main/python/core/runnables/main_loop.py  | 96 ++++++++++++++++------
 .../main/python/core/runnables/network_receiver.py |  2 +
 .../main/python/core/runnables/network_sender.py   |  2 +
 .../input_port_materialization_reader_runnable.py  |  2 +
 .../test/python/core/models/test_loop_operators.py |  9 +-
 amber/src/test/python/core/models/test_state.py    | 13 ++-
 .../test/python/core/runnables/test_main_loop.py   | 61 ++++++++++----
 ...t_input_port_materialization_reader_runnable.py | 21 ++++-
 .../org/apache/texera/amber/core/state/State.scala | 23 ++++--
 .../apache/texera/amber/util/ArrowUtilsSpec.scala  | 24 ++++--
 13 files changed, 239 insertions(+), 81 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 8df8586926..a84b651f78 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -212,7 +212,12 @@ class OutputManager:
             )
 
     def save_state_to_storage_if_needed(
-        self, state: State, loop_counter: int, port_id=None
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
     ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
@@ -220,7 +225,9 @@ class OutputManager:
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = 
PortStorageWriterElement(data_tuple=state.to_tuple(loop_counter))
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
@@ -313,7 +320,11 @@ class OutputManager:
         )
 
     def emit_state(
-        self, state: State, loop_counter: int
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
     ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
         return chain(
             *(
@@ -321,7 +332,12 @@ class OutputManager:
                     (
                         receiver,
                         (
-                            StateFrame(payload, loop_counter=loop_counter)
+                            StateFrame(
+                                payload,
+                                loop_counter=loop_counter,
+                                loop_start_id=loop_start_id,
+                                loop_start_state_uri=loop_start_state_uri,
+                            )
                             if isinstance(payload, State)
                             else self.tuple_to_frame(payload)
                         ),
diff --git a/amber/src/main/python/core/models/payload.py 
b/amber/src/main/python/core/models/payload.py
index c2de13c712..3f1ec4e7fe 100644
--- a/amber/src/main/python/core/models/payload.py
+++ b/amber/src/main/python/core/models/payload.py
@@ -35,5 +35,11 @@ class DataFrame(DataPayload):
 class StateFrame(DataPayload):
     frame: State
     # Loop-control bookkeeping owned by the worker runtime, carried alongside
-    # the State payload (not inside it). Defaults to 0 for all non-loop state.
+    # the State payload (not inside it) so it never collides with user state.
+    # Defaults are the "no loop" values for all non-loop state.
     loop_counter: int = 0
+    # Which LoopStart to jump back to, and the iceberg URI its input is read
+    # from. Set by the runtime on a LoopStart's output, consumed by the
+    # matching LoopEnd. Empty for non-loop / not-yet-stamped state.
+    loop_start_id: str = ""
+    loop_start_state_uri: str = ""
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 242996289e..4fce475e49 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -25,20 +25,39 @@ from .tuple import Tuple
 
 class State(dict):
     CONTENT = "content"
-    # loop_counter is loop-control bookkeeping owned by the worker runtime, not
-    # user state -- it never appears in the content JSON. In memory it rides on
-    # the StateFrame envelope; it is materialized/serialized as its own column
-    # (parallel to content) by to_tuple(loop_counter). from_tuple() returns the
-    # bare State; callers that need the counter read the LOOP_COUNTER column.
+    # Loop-control bookkeeping owned by the worker runtime, NOT user state -- 
it
+    # never appears in the content JSON. In memory it rides on the StateFrame
+    # envelope; it is materialized/serialized as its own column (parallel to
+    # content) by to_tuple(...). from_tuple() returns the bare State; callers
+    # that need these values read the corresponding columns off the tuple.
     LOOP_COUNTER = "loop_counter"
-    SCHEMA = Schema(raw_schema={CONTENT: "STRING", LOOP_COUNTER: "LONG"})
+    LOOP_START_ID = "loop_start_id"
+    LOOP_START_STATE_URI = "loop_start_state_uri"
+    SCHEMA = Schema(
+        raw_schema={
+            CONTENT: "STRING",
+            LOOP_COUNTER: "LONG",
+            LOOP_START_ID: "STRING",
+            LOOP_START_STATE_URI: "STRING",
+        }
+    )
 
     def to_json(self) -> str:
         return json.dumps(_to_json_value(self), separators=(",", ":"))
 
-    def to_tuple(self, loop_counter: int = 0) -> Tuple:
+    def to_tuple(
+        self,
+        loop_counter: int = 0,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+    ) -> Tuple:
         return Tuple(
-            {State.CONTENT: self.to_json(), State.LOOP_COUNTER: 
int(loop_counter)},
+            {
+                State.CONTENT: self.to_json(),
+                State.LOOP_COUNTER: int(loop_counter),
+                State.LOOP_START_ID: loop_start_id,
+                State.LOOP_START_STATE_URI: loop_start_state_uri,
+            },
             schema=State.SCHEMA,
         )
 
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 3ab05d61c3..5c5ac15d58 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -84,6 +84,13 @@ class MainLoop(StoppableQueueBlockingRunnable):
         super().__init__(self.__class__.__name__, queue=input_queue)
         self._input_queue: InternalQueue = input_queue
         self._output_queue: InternalQueue = output_queue
+        # Captured from the input StateFrame when a matching LoopEnd
+        # (loop_counter == 0) consumes a state, so 
complete()/_jump_to_loop_start
+        # knows which LoopStart to jump back to. This metadata rides the
+        # StateFrame envelope (not user state), so it is no longer read off
+        # executor.state.
+        self._loop_start_id: str = ""
+        self._loop_start_state_uri: str = ""
 
         self.context = Context(worker_id, input_queue)
         self._async_rpc_server = AsyncRPCServer(output_queue, 
context=self.context)
@@ -94,31 +101,34 @@ class MainLoop(StoppableQueueBlockingRunnable):
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
-    def _attach_loop_start_id(self, output_state: State) -> None:
-        if "LoopStartId" in output_state:
-            return
-        output_state["LoopStartId"] = get_operator_id(self.context.worker_id)
+    def _compute_loop_start_id(self) -> typing.Tuple[str, str]:
+        # A LoopStart stamps its own operator id and the iceberg URI its input
+        # is read from onto the state it emits; the matching LoopEnd reads them
+        # back to jump. These ride the StateFrame envelope, not user state.
+        loop_start_id = get_operator_id(self.context.worker_id)
         # The URI lives on the upstream operator's output port (which
         # LoopStart's first materialization reader is reading from).
         reader_runnables = (
             self.context.input_manager.get_input_port_mat_reader_threads()
         )
-        output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+        loop_start_state_uri = VFSURIFactory.state_uri(
             next(iter(reader_runnables.values()))[0].uri
         )
+        return loop_start_id, loop_start_state_uri
 
     def _jump_to_loop_start(
         self, executor: LoopEndOperator, controller_interface
     ) -> None:
         state = executor.state
         controller_interface.jump_to_operator_region(
-            JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+            JumpToOperatorRegionRequest(OperatorIdentity(self._loop_start_id))
         )
-        uri = state["LoopStartStateURI"]
-        # Strip the per-iteration scratch (`table`, `output`) and the
-        # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
-        # user-visible loop state is written back to LoopStart's input.
-        for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+        uri = self._loop_start_state_uri
+        # Strip the per-iteration scratch (`table`, `output`) so only the
+        # user-visible loop state is written back to LoopStart's input. The
+        # loop metadata (counter, LoopStartId, LoopStartStateURI) is owned by
+        # the runtime and never lived in this dict.
+        for key in ("table", "output"):
             state.pop(key, None)
         writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
         # The back-edge fires only after the matching LoopEnd consumed at
@@ -231,7 +241,12 @@ class MainLoop(StoppableQueueBlockingRunnable):
                     output_tuple
                 )
 
-    def process_input_state(self, output_loop_counter: int = 0) -> None:
+    def process_input_state(
+        self,
+        output_loop_counter: int = 0,
+        output_loop_start_id: str = "",
+        output_loop_start_state_uri: str = "",
+    ) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
         if output_state is not None:
@@ -239,11 +254,28 @@ class MainLoop(StoppableQueueBlockingRunnable):
             if isinstance(executor, LoopEndOperator):
                 self.context.output_manager.reset_storage()
             elif isinstance(executor, LoopStartOperator):
-                self._attach_loop_start_id(output_state)
-            self._emit_and_save_state(output_state, output_loop_counter)
+                # A LoopStart stamps its own id/uri onto the state it emits.
+                (
+                    output_loop_start_id,
+                    output_loop_start_state_uri,
+                ) = self._compute_loop_start_id()
+            self._emit_and_save_state(
+                output_state,
+                output_loop_counter,
+                output_loop_start_id,
+                output_loop_start_state_uri,
+            )
 
-    def _emit_and_save_state(self, state: State, loop_counter: int) -> None:
-        for to, batch in self.context.output_manager.emit_state(state, 
loop_counter):
+    def _emit_and_save_state(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+    ) -> None:
+        for to, batch in self.context.output_manager.emit_state(
+            state, loop_counter, loop_start_id, loop_start_state_uri
+        ):
             self._output_queue.put(
                 DataElement(
                     tag=ChannelIdentity(
@@ -252,7 +284,9 @@ class MainLoop(StoppableQueueBlockingRunnable):
                     payload=batch,
                 )
             )
-        self.context.output_manager.save_state_to_storage_if_needed(state, 
loop_counter)
+        self.context.output_manager.save_state_to_storage_if_needed(
+            state, loop_counter, loop_start_id, loop_start_state_uri
+        )
 
     def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
         """
@@ -305,19 +339,35 @@ class MainLoop(StoppableQueueBlockingRunnable):
         executor = self.context.executor_manager.executor
 
         if isinstance(executor, LoopEndOperator) and in_counter > 0:
-            # State belongs to an outer loop: step one level out and forward.
-            self._emit_and_save_state(state, in_counter - 1)
+            # State belongs to an outer loop: step one level out and forward,
+            # carrying the outer loop's id/uri unchanged.
+            self._emit_and_save_state(
+                state, in_counter - 1, frame.loop_start_id, 
frame.loop_start_state_uri
+            )
             self._check_and_process_control()
             return
-        if isinstance(executor, LoopStartOperator) and "LoopStartStateURI" in 
state:
+        if isinstance(executor, LoopStartOperator) and 
frame.loop_start_state_uri:
             # Outer loop's state flowing through an inner LoopStart: step one
-            # level deeper and forward.
-            self._emit_and_save_state(state, in_counter + 1)
+            # level deeper and forward, keeping the outer loop's id/uri.
+            self._emit_and_save_state(
+                state, in_counter + 1, frame.loop_start_id, 
frame.loop_start_state_uri
+            )
             self._check_and_process_control()
             return
 
+        if isinstance(executor, LoopEndOperator):
+            # Matching LoopEnd (in_counter == 0): it will consume this state 
and
+            # jump back. Remember which LoopStart to jump to -- it rides the
+            # envelope now, not user state -- for 
complete()/_jump_to_loop_start.
+            self._loop_start_id = frame.loop_start_id
+            self._loop_start_state_uri = frame.loop_start_state_uri
+
         self.context.state_processing_manager.current_input_state = state
-        self.process_input_state(output_loop_counter=in_counter)
+        self.process_input_state(
+            output_loop_counter=in_counter,
+            output_loop_start_id=frame.loop_start_id,
+            output_loop_start_state_uri=frame.loop_start_state_uri,
+        )
         self._check_and_process_control()
 
     def _process_start_channel(self) -> None:
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index a43f990a5d..2c672a0775 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -99,6 +99,8 @@ class NetworkReceiver(Runnable, Stoppable):
                 lambda _: StateFrame(
                     State.from_json(table[State.CONTENT][0].as_py()),
                     loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+                    loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+                    
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),
                 ),
                 "ECM",
                 lambda _: 
EmbeddedControlMessage().parse(table["payload"][0].as_py()),
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index 3dd077b97d..68d89e0ebf 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -109,6 +109,8 @@ class NetworkSender(StoppableQueueBlockingRunnable):
                 {
                     State.CONTENT: [data_payload.frame.to_json()],
                     State.LOOP_COUNTER: [int(data_payload.loop_counter)],
+                    State.LOOP_START_ID: [data_payload.loop_start_id],
+                    State.LOOP_START_STATE_URI: 
[data_payload.loop_start_state_uri],
                 },
                 schema=State.SCHEMA.as_arrow_schema(),
             )
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e0362e8a26..97a24fe073 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -163,6 +163,8 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                     StateFrame(
                         State.from_tuple(state_row),
                         loop_counter=state_row[State.LOOP_COUNTER],
+                        loop_start_id=state_row[State.LOOP_START_ID],
+                        
loop_start_state_uri=state_row[State.LOOP_START_STATE_URI],
                     )
                 )
 
diff --git a/amber/src/test/python/core/models/test_loop_operators.py 
b/amber/src/test/python/core/models/test_loop_operators.py
index edbc029ad7..d624453653 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -224,13 +224,13 @@ class TestLoopEndMatchingBranch:
         # Simulate LoopStart's produced state arriving here.
         from pickle import dumps
 
+        # The content carries only user data (i) and the per-iteration table
+        # scratch. loop_counter / LoopStartId / LoopStartStateURI are
+        # runtime-owned and ride the StateFrame envelope, never the content.
         incoming = State(
             {
-                "loop_counter": 0,
                 "i": 2,
                 "table": dumps(Table([Tuple({"v": 1})])),
-                "LoopStartId": "outer-loop",
-                "LoopStartStateURI": "vfs:///outer",
             }
         )
 
@@ -241,9 +241,6 @@ class TestLoopEndMatchingBranch:
         # The table is unpickled in-place so condition() can see it as
         # a real Table without a second round of deserialization.
         assert isinstance(op.state["table"], Table)
-        # Loop metadata is preserved so _jump_to_loop_start can read it.
-        assert op.state["LoopStartId"] == "outer-loop"
-        assert op.state["LoopStartStateURI"] == "vfs:///outer"
 
     def test_condition_evaluates_user_expression_against_stashed_state(self):
         op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
diff --git a/amber/src/test/python/core/models/test_state.py 
b/amber/src/test/python/core/models/test_state.py
index e360342bc4..b51a01267b 100644
--- a/amber/src/test/python/core/models/test_state.py
+++ b/amber/src/test/python/core/models/test_state.py
@@ -30,9 +30,16 @@ class TestState:
     def test_class_attributes(self):
         assert State.CONTENT == "content"
         assert State.LOOP_COUNTER == "loop_counter"
-        # loop_counter is a sibling column to content (runtime-owned loop
-        # bookkeeping), not part of the user state JSON.
-        assert State.SCHEMA.get_attr_names() == ["content", "loop_counter"]
+        assert State.LOOP_START_ID == "loop_start_id"
+        assert State.LOOP_START_STATE_URI == "loop_start_state_uri"
+        # The loop-control columns are runtime-owned bookkeeping, sibling to
+        # content, not part of the user state JSON.
+        assert State.SCHEMA.get_attr_names() == [
+            "content",
+            "loop_counter",
+            "loop_start_id",
+            "loop_start_state_uri",
+        ]
 
     def test_json_round_trip_primitives(self):
         original = State(
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py 
b/amber/src/test/python/core/runnables/test_main_loop.py
index c7980fc3a6..87a372020d 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -1151,7 +1151,7 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: [
+            lambda state, loop_counter, *_: [
                 (mock_data_output_channel.to_worker_id, StateFrame(state))
             ],
         )
@@ -1363,7 +1363,7 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: [
+            lambda state, loop_counter, *_: [
                 (mock_data_output_channel.to_worker_id, StateFrame(state))
             ],
         )
@@ -1419,14 +1419,14 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: [
+            lambda state, loop_counter, *_: [
                 (mock_data_output_channel.to_worker_id, StateFrame(state))
             ],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state, loop_counter: saved_states.append(state),
+            lambda state, loop_counter, *_: saved_states.append(state),
         )
 
         def fake_switch_context():
@@ -1480,14 +1480,14 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: [
+            lambda state, loop_counter, *_: [
                 (mock_data_output_channel.to_worker_id, StateFrame(state))
             ],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state, loop_counter: saved_states.append(state),
+            lambda state, loop_counter, *_: saved_states.append(state),
         )
         # _send_ecm_to_data_channels touches output_manager state we don't
         # set up here; for this test the ECM forwarding is irrelevant -- the
@@ -1547,12 +1547,12 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: [],
+            lambda state, loop_counter, *_: [],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state, loop_counter: save_calls.append(state),
+            lambda state, loop_counter, *_: save_calls.append(state),
         )
         # Pretend DataProc consumed the input but produced no output.
         monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
@@ -1796,7 +1796,12 @@ class TestMainLoop:
     # pass-through branches the operator must be skipped entirely.
 
     def _capture_state_emit(self, main_loop, monkeypatch):
-        """Stub emit/save/switch; return (emitted, switched) recorders."""
+        """Stub emit/save/switch; return (emitted, switched) recorders.
+
+        Each `emitted` entry is (state, loop_counter, loop_start_id,
+        loop_start_state_uri) so tests can assert the loop metadata the runtime
+        attaches to the StateFrame envelope.
+        """
         emitted = []
         switched = []
         monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
@@ -1804,21 +1809,27 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state, loop_counter: emitted.append((state, loop_counter)) 
or [],
+            lambda state, loop_counter, loop_start_id="", 
loop_start_state_uri="": (
+                emitted.append(
+                    (state, loop_counter, loop_start_id, loop_start_state_uri)
+                )
+                or []
+            ),
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state, loop_counter: None,
+            lambda state, loop_counter, *_: None,
         )
         return emitted, switched
 
     def test_loopstart_reentry_increments_counter_and_skips_operator(
         self, main_loop, monkeypatch
     ):
-        # A state already carrying LoopStartStateURI is an outer loop's state
-        # passing through this inner LoopStart. The runtime forwards it with
-        # loop_counter + 1 and must NOT invoke the operator.
+        # A state arriving with a loop_start_state_uri on its envelope is an
+        # outer loop's state passing through this inner LoopStart. The runtime
+        # forwards it with loop_counter + 1 (keeping the outer id/uri) and must
+        # NOT invoke the operator.
         class StubLoopStart(LoopStartOperator):
             def process_table(self, table, port):
                 yield
@@ -1827,15 +1838,22 @@ class TestMainLoop:
         emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
 
         main_loop._process_state_frame(
-            StateFrame(State({"LoopStartStateURI": "vfs:///x", "i": 5}), 
loop_counter=1)
+            StateFrame(
+                State({"i": 5}),
+                loop_counter=1,
+                loop_start_id="outer-loop",
+                loop_start_state_uri="vfs:///outer",
+            )
         )
 
         assert switched == [], "nested pass-through must not invoke the 
operator"
         assert len(emitted) == 1
-        emitted_state, emitted_counter = emitted[0]
+        emitted_state, emitted_counter, emitted_id, emitted_uri = emitted[0]
         assert emitted_counter == 2  # 1 + 1
         assert emitted_state["i"] == 5
         assert "loop_counter" not in emitted_state  # never leaks into State
+        # the outer loop's id/uri ride through unchanged
+        assert (emitted_id, emitted_uri) == ("outer-loop", "vfs:///outer")
 
     def test_loopend_passthrough_decrements_counter_and_skips_operator(
         self, main_loop, monkeypatch
@@ -1850,14 +1868,21 @@ class TestMainLoop:
         emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
 
         main_loop._process_state_frame(
-            StateFrame(State({"outer_var": "v"}), loop_counter=2)
+            StateFrame(
+                State({"outer_var": "v"}),
+                loop_counter=2,
+                loop_start_id="outer-loop",
+                loop_start_state_uri="vfs:///outer",
+            )
         )
 
         assert switched == [], "pass-through must not invoke the operator"
         assert len(emitted) == 1
-        emitted_state, emitted_counter = emitted[0]
+        emitted_state, emitted_counter, emitted_id, emitted_uri = emitted[0]
         assert emitted_counter == 1  # 2 - 1
         assert emitted_state["outer_var"] == "v"
+        # the outer loop's id/uri ride through unchanged
+        assert (emitted_id, emitted_uri) == ("outer-loop", "vfs:///outer")
 
     def test_loopend_consume_invokes_operator_at_counter_zero(
         self, main_loop, monkeypatch
diff --git 
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
index d3f269092b..59635d0e87 100644
--- 
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
+++ 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -63,12 +63,20 @@ class TestRunStateReadingBlock:
         state_a = State({"i": 0})
         state_b = State({"i": 1})
 
-        # The state document yields opaque 2-column tuples. State.from_tuple
+        # The state document yields opaque multi-column tuples. 
State.from_tuple
         # (patched) deserializes the content column; the reader reads the
-        # loop_counter column directly off the row and carries it onto the
+        # loop-control columns directly off the row and carries them onto the
         # emitted StateFrame envelope.
-        row_a = {State.LOOP_COUNTER: 0}
-        row_b = {State.LOOP_COUNTER: 1}
+        row_a = {
+            State.LOOP_COUNTER: 0,
+            State.LOOP_START_ID: "loop-a",
+            State.LOOP_START_STATE_URI: "vfs:///a",
+        }
+        row_b = {
+            State.LOOP_COUNTER: 1,
+            State.LOOP_START_ID: "loop-b",
+            State.LOOP_START_STATE_URI: "vfs:///b",
+        }
         result_doc = MagicMock()
         result_doc.get.return_value = iter([])  # No materialized tuples.
         state_doc = MagicMock()
@@ -100,4 +108,9 @@ class TestRunStateReadingBlock:
         ]
         assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
         assert [sf.payload.loop_counter for sf in state_frames] == [0, 1]
+        assert [sf.payload.loop_start_id for sf in state_frames] == ["loop-a", 
"loop-b"]
+        assert [sf.payload.loop_start_state_uri for sf in state_frames] == [
+            "vfs:///a",
+            "vfs:///b",
+        ]
         assert runnable._finished is True
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 24162de152..92103477d1 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -31,24 +31,35 @@ final case class State(values: Map[String, Any]) {
   def toJson: String =
     objectMapper.writeValueAsString(State.toJsonValue(values))
 
-  def toTuple(loopCounter: Long = 0L): Tuple =
-    Tuple.builder(State.schema).addSequentially(Array(toJson, 
Long.box(loopCounter))).build()
+  def toTuple(
+      loopCounter: Long = 0L,
+      loopStartId: String = "",
+      loopStartStateUri: String = ""
+  ): Tuple =
+    Tuple
+      .builder(State.schema)
+      .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId, 
loopStartStateUri))
+      .build()
 }
 
 object State {
   private val Content = "content"
   // loop-control bookkeeping owned by the (Python) worker runtime; not user
-  // state and never in the content JSON. Materialized as its own column,
-  // parallel to content. Scala never originates a non-zero counter (loop
-  // operators are Python-only), so toTuple defaults it to 0.
+  // state and never in the content JSON. Materialized as its own columns,
+  // parallel to content. Scala never originates loop state (loop operators are
+  // Python-only), so toTuple defaults these to the "no loop" values.
   private val LoopCounter = "loop_counter"
+  private val LoopStartId = "loop_start_id"
+  private val LoopStartStateUri = "loop_start_state_uri"
   private val BytesTypeMarker = "__texera_type__"
   private val BytesValue = "bytes"
   private val PayloadMarker = "payload"
 
   val schema: Schema = new Schema(
     new Attribute(Content, AttributeType.STRING),
-    new Attribute(LoopCounter, AttributeType.LONG)
+    new Attribute(LoopCounter, AttributeType.LONG),
+    new Attribute(LoopStartId, AttributeType.STRING),
+    new Attribute(LoopStartStateUri, AttributeType.STRING)
   )
 
   def fromJson(payload: String): State =
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
index 8ba9d9788b..486deefbcc 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
@@ -294,11 +294,12 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
     // Arrow -> getTexeraTuple -> Tuple (PythonProxyServer) on the other.
     // The schema-only round-trip tests above don't exercise the per-row data
     // encode/decode, so a column dropped or mistyped there would slip through.
-    // Pin that a two-column State tuple (content STRING + loop_counter LONG)
-    // survives a real setTexeraTuple -> Arrow vectors -> getTexeraTuple
-    // round-trip with both columns intact -- the property the wire hop relies
-    // on after State gained its loop_counter column.
-    val original = State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L)
+    // Pin that the full multi-column State tuple (content STRING + the
+    // loop-control columns loop_counter LONG, loop_start_id / 
loop_start_state_uri
+    // STRING) survives a real setTexeraTuple -> Arrow vectors -> 
getTexeraTuple
+    // round-trip with every column intact -- the property the wire hop relies 
on.
+    val original =
+      State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop", 
"vfs:///outer")
 
     val allocator = new RootAllocator()
     val root = 
VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(original.getSchema), 
allocator)
@@ -309,13 +310,20 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
 
       val recovered = ArrowUtils.getTexeraTuple(0, root)
 
-      // Both columns survive the encode/decode, with names and types intact.
+      // Every column survives the encode/decode, with names and types intact.
       recovered.getSchema.getAttributes.toList.map(a => (a.getName, 
a.getType)) shouldBe
-        List(("content", AttributeType.STRING), ("loop_counter", 
AttributeType.LONG))
+        List(
+          ("content", AttributeType.STRING),
+          ("loop_counter", AttributeType.LONG),
+          ("loop_start_id", AttributeType.STRING),
+          ("loop_start_state_uri", AttributeType.STRING)
+        )
       // content (the user State JSON) round-trips...
       State.fromTuple(recovered).values shouldBe Map("i" -> 5L, "label" -> 
"outer")
-      // ...and so does the loop_counter column.
+      // ...and so do the loop-control columns.
       recovered.getField[java.lang.Long]("loop_counter").toLong shouldBe 3L
+      recovered.getField[String]("loop_start_id") shouldBe "outer-loop"
+      recovered.getField[String]("loop_start_state_uri") shouldBe 
"vfs:///outer"
     } finally {
       root.close()
       allocator.close()


Reply via email to