This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 30ba48c39f refactor(amber): carry LoopStartId/LoopStartStateURI off
State on the StateFrame envelope
30ba48c39f is described below
commit 30ba48c39f21b08a88306de7eff12c67425fbe45
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 2 23:00:04 2026 -0700
refactor(amber): carry LoopStartId/LoopStartStateURI off State on the
StateFrame envelope
Review follow-up (r3285892274): user loop expressions exec against
self.state, which the runtime also stuffed with loop bookkeeping -- a user
writing e.g. `LoopStartId = ...` would silently clobber loop machinery.
loop_counter was already moved off State; do the same for the LoopStart jump
metadata.
LoopStartId and LoopStartStateURI now ride the StateFrame envelope and are
materialized as their own columns (parallel to content), exactly like
loop_counter. The runtime owns them end to end: a LoopStart stamps them onto
its output, the nested pass-through carries them, and the matching LoopEnd's
jump reads them from the runtime (captured from the input frame) instead of
executor.state -- so they never enter the user exec namespace. `table` (a
user-facing input) and `output` (the run [...]
Extends State to four columns and the ArrowUtils Tuple<->Arrow round-trip +
materialization-reader tests to cover them.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
.../core/architecture/packaging/output_manager.py | 24 +++++-
amber/src/main/python/core/models/payload.py | 8 +-
amber/src/main/python/core/models/state.py | 35 ++++++--
amber/src/main/python/core/runnables/main_loop.py | 96 ++++++++++++++++------
.../main/python/core/runnables/network_receiver.py | 2 +
.../main/python/core/runnables/network_sender.py | 2 +
.../input_port_materialization_reader_runnable.py | 2 +
.../test/python/core/models/test_loop_operators.py | 9 +-
amber/src/test/python/core/models/test_state.py | 13 ++-
.../test/python/core/runnables/test_main_loop.py | 61 ++++++++++----
...t_input_port_materialization_reader_runnable.py | 21 ++++-
.../org/apache/texera/amber/core/state/State.scala | 23 ++++--
.../apache/texera/amber/util/ArrowUtilsSpec.scala | 24 ++++--
13 files changed, 239 insertions(+), 81 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 8df8586926..a84b651f78 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -212,7 +212,12 @@ class OutputManager:
)
def save_state_to_storage_if_needed(
- self, state: State, loop_counter: int, port_id=None
+ self,
+ state: State,
+ loop_counter: int,
+ loop_start_id: str = "",
+ loop_start_state_uri: str = "",
+ port_id=None,
) -> None:
# When port_id is omitted the same state row is fanned out to
# every output port's state table. This mirrors the
@@ -220,7 +225,9 @@ class OutputManager:
# shared context, not per-key data, so every downstream operator
# (and every worker reading the materialization) needs the full
# set.
- element =
PortStorageWriterElement(data_tuple=state.to_tuple(loop_counter))
+ element = PortStorageWriterElement(
+ data_tuple=state.to_tuple(loop_counter, loop_start_id,
loop_start_state_uri)
+ )
if port_id is None:
for writer_queue, _, _ in self._port_state_writers.values():
writer_queue.put(element)
@@ -313,7 +320,11 @@ class OutputManager:
)
def emit_state(
- self, state: State, loop_counter: int
+ self,
+ state: State,
+ loop_counter: int,
+ loop_start_id: str = "",
+ loop_start_state_uri: str = "",
) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
return chain(
*(
@@ -321,7 +332,12 @@ class OutputManager:
(
receiver,
(
- StateFrame(payload, loop_counter=loop_counter)
+ StateFrame(
+ payload,
+ loop_counter=loop_counter,
+ loop_start_id=loop_start_id,
+ loop_start_state_uri=loop_start_state_uri,
+ )
if isinstance(payload, State)
else self.tuple_to_frame(payload)
),
diff --git a/amber/src/main/python/core/models/payload.py
b/amber/src/main/python/core/models/payload.py
index c2de13c712..3f1ec4e7fe 100644
--- a/amber/src/main/python/core/models/payload.py
+++ b/amber/src/main/python/core/models/payload.py
@@ -35,5 +35,11 @@ class DataFrame(DataPayload):
class StateFrame(DataPayload):
frame: State
# Loop-control bookkeeping owned by the worker runtime, carried alongside
- # the State payload (not inside it). Defaults to 0 for all non-loop state.
+ # the State payload (not inside it) so it never collides with user state.
+ # Defaults are the "no loop" values for all non-loop state.
loop_counter: int = 0
+ # Which LoopStart to jump back to, and the iceberg URI its input is read
+ # from. Set by the runtime on a LoopStart's output, consumed by the
+ # matching LoopEnd. Empty for non-loop / not-yet-stamped state.
+ loop_start_id: str = ""
+ loop_start_state_uri: str = ""
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 242996289e..4fce475e49 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -25,20 +25,39 @@ from .tuple import Tuple
class State(dict):
CONTENT = "content"
- # loop_counter is loop-control bookkeeping owned by the worker runtime, not
- # user state -- it never appears in the content JSON. In memory it rides on
- # the StateFrame envelope; it is materialized/serialized as its own column
- # (parallel to content) by to_tuple(loop_counter). from_tuple() returns the
- # bare State; callers that need the counter read the LOOP_COUNTER column.
+ # Loop-control bookkeeping owned by the worker runtime, NOT user state --
it
+ # never appears in the content JSON. In memory it rides on the StateFrame
+ # envelope; it is materialized/serialized as its own column (parallel to
+ # content) by to_tuple(...). from_tuple() returns the bare State; callers
+ # that need these values read the corresponding columns off the tuple.
LOOP_COUNTER = "loop_counter"
- SCHEMA = Schema(raw_schema={CONTENT: "STRING", LOOP_COUNTER: "LONG"})
+ LOOP_START_ID = "loop_start_id"
+ LOOP_START_STATE_URI = "loop_start_state_uri"
+ SCHEMA = Schema(
+ raw_schema={
+ CONTENT: "STRING",
+ LOOP_COUNTER: "LONG",
+ LOOP_START_ID: "STRING",
+ LOOP_START_STATE_URI: "STRING",
+ }
+ )
def to_json(self) -> str:
return json.dumps(_to_json_value(self), separators=(",", ":"))
- def to_tuple(self, loop_counter: int = 0) -> Tuple:
+ def to_tuple(
+ self,
+ loop_counter: int = 0,
+ loop_start_id: str = "",
+ loop_start_state_uri: str = "",
+ ) -> Tuple:
return Tuple(
- {State.CONTENT: self.to_json(), State.LOOP_COUNTER:
int(loop_counter)},
+ {
+ State.CONTENT: self.to_json(),
+ State.LOOP_COUNTER: int(loop_counter),
+ State.LOOP_START_ID: loop_start_id,
+ State.LOOP_START_STATE_URI: loop_start_state_uri,
+ },
schema=State.SCHEMA,
)
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 3ab05d61c3..5c5ac15d58 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -84,6 +84,13 @@ class MainLoop(StoppableQueueBlockingRunnable):
super().__init__(self.__class__.__name__, queue=input_queue)
self._input_queue: InternalQueue = input_queue
self._output_queue: InternalQueue = output_queue
+ # Captured from the input StateFrame when a matching LoopEnd
+ # (loop_counter == 0) consumes a state, so
complete()/_jump_to_loop_start
+ # knows which LoopStart to jump back to. This metadata rides the
+ # StateFrame envelope (not user state), so it is no longer read off
+ # executor.state.
+ self._loop_start_id: str = ""
+ self._loop_start_state_uri: str = ""
self.context = Context(worker_id, input_queue)
self._async_rpc_server = AsyncRPCServer(output_queue,
context=self.context)
@@ -94,31 +101,34 @@ class MainLoop(StoppableQueueBlockingRunnable):
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
- def _attach_loop_start_id(self, output_state: State) -> None:
- if "LoopStartId" in output_state:
- return
- output_state["LoopStartId"] = get_operator_id(self.context.worker_id)
+ def _compute_loop_start_id(self) -> typing.Tuple[str, str]:
+ # A LoopStart stamps its own operator id and the iceberg URI its input
+ # is read from onto the state it emits; the matching LoopEnd reads them
+ # back to jump. These ride the StateFrame envelope, not user state.
+ loop_start_id = get_operator_id(self.context.worker_id)
# The URI lives on the upstream operator's output port (which
# LoopStart's first materialization reader is reading from).
reader_runnables = (
self.context.input_manager.get_input_port_mat_reader_threads()
)
- output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ loop_start_state_uri = VFSURIFactory.state_uri(
next(iter(reader_runnables.values()))[0].uri
)
+ return loop_start_id, loop_start_state_uri
def _jump_to_loop_start(
self, executor: LoopEndOperator, controller_interface
) -> None:
state = executor.state
controller_interface.jump_to_operator_region(
- JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+ JumpToOperatorRegionRequest(OperatorIdentity(self._loop_start_id))
)
- uri = state["LoopStartStateURI"]
- # Strip the per-iteration scratch (`table`, `output`) and the
- # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
- # user-visible loop state is written back to LoopStart's input.
- for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+ uri = self._loop_start_state_uri
+ # Strip the per-iteration scratch (`table`, `output`) so only the
+ # user-visible loop state is written back to LoopStart's input. The
+ # loop metadata (counter, LoopStartId, LoopStartStateURI) is owned by
+ # the runtime and never lived in this dict.
+ for key in ("table", "output"):
state.pop(key, None)
writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
# The back-edge fires only after the matching LoopEnd consumed at
@@ -231,7 +241,12 @@ class MainLoop(StoppableQueueBlockingRunnable):
output_tuple
)
- def process_input_state(self, output_loop_counter: int = 0) -> None:
+ def process_input_state(
+ self,
+ output_loop_counter: int = 0,
+ output_loop_start_id: str = "",
+ output_loop_start_state_uri: str = "",
+ ) -> None:
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
if output_state is not None:
@@ -239,11 +254,28 @@ class MainLoop(StoppableQueueBlockingRunnable):
if isinstance(executor, LoopEndOperator):
self.context.output_manager.reset_storage()
elif isinstance(executor, LoopStartOperator):
- self._attach_loop_start_id(output_state)
- self._emit_and_save_state(output_state, output_loop_counter)
+ # A LoopStart stamps its own id/uri onto the state it emits.
+ (
+ output_loop_start_id,
+ output_loop_start_state_uri,
+ ) = self._compute_loop_start_id()
+ self._emit_and_save_state(
+ output_state,
+ output_loop_counter,
+ output_loop_start_id,
+ output_loop_start_state_uri,
+ )
- def _emit_and_save_state(self, state: State, loop_counter: int) -> None:
- for to, batch in self.context.output_manager.emit_state(state,
loop_counter):
+ def _emit_and_save_state(
+ self,
+ state: State,
+ loop_counter: int,
+ loop_start_id: str = "",
+ loop_start_state_uri: str = "",
+ ) -> None:
+ for to, batch in self.context.output_manager.emit_state(
+ state, loop_counter, loop_start_id, loop_start_state_uri
+ ):
self._output_queue.put(
DataElement(
tag=ChannelIdentity(
@@ -252,7 +284,9 @@ class MainLoop(StoppableQueueBlockingRunnable):
payload=batch,
)
)
- self.context.output_manager.save_state_to_storage_if_needed(state,
loop_counter)
+ self.context.output_manager.save_state_to_storage_if_needed(
+ state, loop_counter, loop_start_id, loop_start_state_uri
+ )
def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
@@ -305,19 +339,35 @@ class MainLoop(StoppableQueueBlockingRunnable):
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator) and in_counter > 0:
- # State belongs to an outer loop: step one level out and forward.
- self._emit_and_save_state(state, in_counter - 1)
+ # State belongs to an outer loop: step one level out and forward,
+ # carrying the outer loop's id/uri unchanged.
+ self._emit_and_save_state(
+ state, in_counter - 1, frame.loop_start_id,
frame.loop_start_state_uri
+ )
self._check_and_process_control()
return
- if isinstance(executor, LoopStartOperator) and "LoopStartStateURI" in
state:
+ if isinstance(executor, LoopStartOperator) and
frame.loop_start_state_uri:
# Outer loop's state flowing through an inner LoopStart: step one
- # level deeper and forward.
- self._emit_and_save_state(state, in_counter + 1)
+ # level deeper and forward, keeping the outer loop's id/uri.
+ self._emit_and_save_state(
+ state, in_counter + 1, frame.loop_start_id,
frame.loop_start_state_uri
+ )
self._check_and_process_control()
return
+ if isinstance(executor, LoopEndOperator):
+ # Matching LoopEnd (in_counter == 0): it will consume this state
and
+ # jump back. Remember which LoopStart to jump to -- it rides the
+ # envelope now, not user state -- for
complete()/_jump_to_loop_start.
+ self._loop_start_id = frame.loop_start_id
+ self._loop_start_state_uri = frame.loop_start_state_uri
+
self.context.state_processing_manager.current_input_state = state
- self.process_input_state(output_loop_counter=in_counter)
+ self.process_input_state(
+ output_loop_counter=in_counter,
+ output_loop_start_id=frame.loop_start_id,
+ output_loop_start_state_uri=frame.loop_start_state_uri,
+ )
self._check_and_process_control()
def _process_start_channel(self) -> None:
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index a43f990a5d..2c672a0775 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -99,6 +99,8 @@ class NetworkReceiver(Runnable, Stoppable):
lambda _: StateFrame(
State.from_json(table[State.CONTENT][0].as_py()),
loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+ loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),
),
"ECM",
lambda _:
EmbeddedControlMessage().parse(table["payload"][0].as_py()),
diff --git a/amber/src/main/python/core/runnables/network_sender.py
b/amber/src/main/python/core/runnables/network_sender.py
index 3dd077b97d..68d89e0ebf 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -109,6 +109,8 @@ class NetworkSender(StoppableQueueBlockingRunnable):
{
State.CONTENT: [data_payload.frame.to_json()],
State.LOOP_COUNTER: [int(data_payload.loop_counter)],
+ State.LOOP_START_ID: [data_payload.loop_start_id],
+ State.LOOP_START_STATE_URI:
[data_payload.loop_start_state_uri],
},
schema=State.SCHEMA.as_arrow_schema(),
)
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e0362e8a26..97a24fe073 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -163,6 +163,8 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
StateFrame(
State.from_tuple(state_row),
loop_counter=state_row[State.LOOP_COUNTER],
+ loop_start_id=state_row[State.LOOP_START_ID],
+
loop_start_state_uri=state_row[State.LOOP_START_STATE_URI],
)
)
diff --git a/amber/src/test/python/core/models/test_loop_operators.py
b/amber/src/test/python/core/models/test_loop_operators.py
index edbc029ad7..d624453653 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -224,13 +224,13 @@ class TestLoopEndMatchingBranch:
# Simulate LoopStart's produced state arriving here.
from pickle import dumps
+ # The content carries only user data (i) and the per-iteration table
+ # scratch. loop_counter / LoopStartId / LoopStartStateURI are
+ # runtime-owned and ride the StateFrame envelope, never the content.
incoming = State(
{
- "loop_counter": 0,
"i": 2,
"table": dumps(Table([Tuple({"v": 1})])),
- "LoopStartId": "outer-loop",
- "LoopStartStateURI": "vfs:///outer",
}
)
@@ -241,9 +241,6 @@ class TestLoopEndMatchingBranch:
# The table is unpickled in-place so condition() can see it as
# a real Table without a second round of deserialization.
assert isinstance(op.state["table"], Table)
- # Loop metadata is preserved so _jump_to_loop_start can read it.
- assert op.state["LoopStartId"] == "outer-loop"
- assert op.state["LoopStartStateURI"] == "vfs:///outer"
def test_condition_evaluates_user_expression_against_stashed_state(self):
op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
diff --git a/amber/src/test/python/core/models/test_state.py
b/amber/src/test/python/core/models/test_state.py
index e360342bc4..b51a01267b 100644
--- a/amber/src/test/python/core/models/test_state.py
+++ b/amber/src/test/python/core/models/test_state.py
@@ -30,9 +30,16 @@ class TestState:
def test_class_attributes(self):
assert State.CONTENT == "content"
assert State.LOOP_COUNTER == "loop_counter"
- # loop_counter is a sibling column to content (runtime-owned loop
- # bookkeeping), not part of the user state JSON.
- assert State.SCHEMA.get_attr_names() == ["content", "loop_counter"]
+ assert State.LOOP_START_ID == "loop_start_id"
+ assert State.LOOP_START_STATE_URI == "loop_start_state_uri"
+ # The loop-control columns are runtime-owned bookkeeping, sibling to
+ # content, not part of the user state JSON.
+ assert State.SCHEMA.get_attr_names() == [
+ "content",
+ "loop_counter",
+ "loop_start_id",
+ "loop_start_state_uri",
+ ]
def test_json_round_trip_primitives(self):
original = State(
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py
b/amber/src/test/python/core/runnables/test_main_loop.py
index c7980fc3a6..87a372020d 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -1151,7 +1151,7 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: [
+ lambda state, loop_counter, *_: [
(mock_data_output_channel.to_worker_id, StateFrame(state))
],
)
@@ -1363,7 +1363,7 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: [
+ lambda state, loop_counter, *_: [
(mock_data_output_channel.to_worker_id, StateFrame(state))
],
)
@@ -1419,14 +1419,14 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: [
+ lambda state, loop_counter, *_: [
(mock_data_output_channel.to_worker_id, StateFrame(state))
],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state, loop_counter: saved_states.append(state),
+ lambda state, loop_counter, *_: saved_states.append(state),
)
def fake_switch_context():
@@ -1480,14 +1480,14 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: [
+ lambda state, loop_counter, *_: [
(mock_data_output_channel.to_worker_id, StateFrame(state))
],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state, loop_counter: saved_states.append(state),
+ lambda state, loop_counter, *_: saved_states.append(state),
)
# _send_ecm_to_data_channels touches output_manager state we don't
# set up here; for this test the ECM forwarding is irrelevant -- the
@@ -1547,12 +1547,12 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: [],
+ lambda state, loop_counter, *_: [],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state, loop_counter: save_calls.append(state),
+ lambda state, loop_counter, *_: save_calls.append(state),
)
# Pretend DataProc consumed the input but produced no output.
monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
@@ -1796,7 +1796,12 @@ class TestMainLoop:
# pass-through branches the operator must be skipped entirely.
def _capture_state_emit(self, main_loop, monkeypatch):
- """Stub emit/save/switch; return (emitted, switched) recorders."""
+ """Stub emit/save/switch; return (emitted, switched) recorders.
+
+ Each `emitted` entry is (state, loop_counter, loop_start_id,
+ loop_start_state_uri) so tests can assert the loop metadata the runtime
+ attaches to the StateFrame envelope.
+ """
emitted = []
switched = []
monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
@@ -1804,21 +1809,27 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state, loop_counter: emitted.append((state, loop_counter))
or [],
+ lambda state, loop_counter, loop_start_id="",
loop_start_state_uri="": (
+ emitted.append(
+ (state, loop_counter, loop_start_id, loop_start_state_uri)
+ )
+ or []
+ ),
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state, loop_counter: None,
+ lambda state, loop_counter, *_: None,
)
return emitted, switched
def test_loopstart_reentry_increments_counter_and_skips_operator(
self, main_loop, monkeypatch
):
- # A state already carrying LoopStartStateURI is an outer loop's state
- # passing through this inner LoopStart. The runtime forwards it with
- # loop_counter + 1 and must NOT invoke the operator.
+ # A state arriving with a loop_start_state_uri on its envelope is an
+ # outer loop's state passing through this inner LoopStart. The runtime
+ # forwards it with loop_counter + 1 (keeping the outer id/uri) and must
+ # NOT invoke the operator.
class StubLoopStart(LoopStartOperator):
def process_table(self, table, port):
yield
@@ -1827,15 +1838,22 @@ class TestMainLoop:
emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
main_loop._process_state_frame(
- StateFrame(State({"LoopStartStateURI": "vfs:///x", "i": 5}),
loop_counter=1)
+ StateFrame(
+ State({"i": 5}),
+ loop_counter=1,
+ loop_start_id="outer-loop",
+ loop_start_state_uri="vfs:///outer",
+ )
)
assert switched == [], "nested pass-through must not invoke the
operator"
assert len(emitted) == 1
- emitted_state, emitted_counter = emitted[0]
+ emitted_state, emitted_counter, emitted_id, emitted_uri = emitted[0]
assert emitted_counter == 2 # 1 + 1
assert emitted_state["i"] == 5
assert "loop_counter" not in emitted_state # never leaks into State
+ # the outer loop's id/uri ride through unchanged
+ assert (emitted_id, emitted_uri) == ("outer-loop", "vfs:///outer")
def test_loopend_passthrough_decrements_counter_and_skips_operator(
self, main_loop, monkeypatch
@@ -1850,14 +1868,21 @@ class TestMainLoop:
emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
main_loop._process_state_frame(
- StateFrame(State({"outer_var": "v"}), loop_counter=2)
+ StateFrame(
+ State({"outer_var": "v"}),
+ loop_counter=2,
+ loop_start_id="outer-loop",
+ loop_start_state_uri="vfs:///outer",
+ )
)
assert switched == [], "pass-through must not invoke the operator"
assert len(emitted) == 1
- emitted_state, emitted_counter = emitted[0]
+ emitted_state, emitted_counter, emitted_id, emitted_uri = emitted[0]
assert emitted_counter == 1 # 2 - 1
assert emitted_state["outer_var"] == "v"
+ # the outer loop's id/uri ride through unchanged
+ assert (emitted_id, emitted_uri) == ("outer-loop", "vfs:///outer")
def test_loopend_consume_invokes_operator_at_counter_zero(
self, main_loop, monkeypatch
diff --git
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
index d3f269092b..59635d0e87 100644
---
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
+++
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -63,12 +63,20 @@ class TestRunStateReadingBlock:
state_a = State({"i": 0})
state_b = State({"i": 1})
- # The state document yields opaque 2-column tuples. State.from_tuple
+ # The state document yields opaque multi-column tuples.
State.from_tuple
# (patched) deserializes the content column; the reader reads the
- # loop_counter column directly off the row and carries it onto the
+ # loop-control columns directly off the row and carries them onto the
# emitted StateFrame envelope.
- row_a = {State.LOOP_COUNTER: 0}
- row_b = {State.LOOP_COUNTER: 1}
+ row_a = {
+ State.LOOP_COUNTER: 0,
+ State.LOOP_START_ID: "loop-a",
+ State.LOOP_START_STATE_URI: "vfs:///a",
+ }
+ row_b = {
+ State.LOOP_COUNTER: 1,
+ State.LOOP_START_ID: "loop-b",
+ State.LOOP_START_STATE_URI: "vfs:///b",
+ }
result_doc = MagicMock()
result_doc.get.return_value = iter([]) # No materialized tuples.
state_doc = MagicMock()
@@ -100,4 +108,9 @@ class TestRunStateReadingBlock:
]
assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
assert [sf.payload.loop_counter for sf in state_frames] == [0, 1]
+ assert [sf.payload.loop_start_id for sf in state_frames] == ["loop-a",
"loop-b"]
+ assert [sf.payload.loop_start_state_uri for sf in state_frames] == [
+ "vfs:///a",
+ "vfs:///b",
+ ]
assert runnable._finished is True
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 24162de152..92103477d1 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -31,24 +31,35 @@ final case class State(values: Map[String, Any]) {
def toJson: String =
objectMapper.writeValueAsString(State.toJsonValue(values))
- def toTuple(loopCounter: Long = 0L): Tuple =
- Tuple.builder(State.schema).addSequentially(Array(toJson,
Long.box(loopCounter))).build()
+ def toTuple(
+ loopCounter: Long = 0L,
+ loopStartId: String = "",
+ loopStartStateUri: String = ""
+ ): Tuple =
+ Tuple
+ .builder(State.schema)
+ .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId,
loopStartStateUri))
+ .build()
}
object State {
private val Content = "content"
// loop-control bookkeeping owned by the (Python) worker runtime; not user
- // state and never in the content JSON. Materialized as its own column,
- // parallel to content. Scala never originates a non-zero counter (loop
- // operators are Python-only), so toTuple defaults it to 0.
+ // state and never in the content JSON. Materialized as its own columns,
+ // parallel to content. Scala never originates loop state (loop operators are
+ // Python-only), so toTuple defaults these to the "no loop" values.
private val LoopCounter = "loop_counter"
+ private val LoopStartId = "loop_start_id"
+ private val LoopStartStateUri = "loop_start_state_uri"
private val BytesTypeMarker = "__texera_type__"
private val BytesValue = "bytes"
private val PayloadMarker = "payload"
val schema: Schema = new Schema(
new Attribute(Content, AttributeType.STRING),
- new Attribute(LoopCounter, AttributeType.LONG)
+ new Attribute(LoopCounter, AttributeType.LONG),
+ new Attribute(LoopStartId, AttributeType.STRING),
+ new Attribute(LoopStartStateUri, AttributeType.STRING)
)
def fromJson(payload: String): State =
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
index 8ba9d9788b..486deefbcc 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
@@ -294,11 +294,12 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
// Arrow -> getTexeraTuple -> Tuple (PythonProxyServer) on the other.
// The schema-only round-trip tests above don't exercise the per-row data
// encode/decode, so a column dropped or mistyped there would slip through.
- // Pin that a two-column State tuple (content STRING + loop_counter LONG)
- // survives a real setTexeraTuple -> Arrow vectors -> getTexeraTuple
- // round-trip with both columns intact -- the property the wire hop relies
- // on after State gained its loop_counter column.
- val original = State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L)
+ // Pin that the full multi-column State tuple (content STRING + the
+ // loop-control columns loop_counter LONG, loop_start_id /
loop_start_state_uri
+ // STRING) survives a real setTexeraTuple -> Arrow vectors ->
getTexeraTuple
+ // round-trip with every column intact -- the property the wire hop relies
on.
+ val original =
+ State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop",
"vfs:///outer")
val allocator = new RootAllocator()
val root =
VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(original.getSchema),
allocator)
@@ -309,13 +310,20 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
val recovered = ArrowUtils.getTexeraTuple(0, root)
- // Both columns survive the encode/decode, with names and types intact.
+ // Every column survives the encode/decode, with names and types intact.
recovered.getSchema.getAttributes.toList.map(a => (a.getName,
a.getType)) shouldBe
- List(("content", AttributeType.STRING), ("loop_counter",
AttributeType.LONG))
+ List(
+ ("content", AttributeType.STRING),
+ ("loop_counter", AttributeType.LONG),
+ ("loop_start_id", AttributeType.STRING),
+ ("loop_start_state_uri", AttributeType.STRING)
+ )
// content (the user State JSON) round-trips...
State.fromTuple(recovered).values shouldBe Map("i" -> 5L, "label" ->
"outer")
- // ...and so does the loop_counter column.
+ // ...and so do the loop-control columns.
recovered.getField[java.lang.Long]("loop_counter").toLong shouldBe 3L
+ recovered.getField[String]("loop_start_id") shouldBe "outer-loop"
+ recovered.getField[String]("loop_start_state_uri") shouldBe
"vfs:///outer"
} finally {
root.close()
allocator.close()