This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 9474b9de2b fix(amber): emit Python operator state outputs reliably 
(#4560)
9474b9de2b is described below

commit 9474b9de2bd2acea85ca67577b30bcf39c3a3bf0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 16:23:07 2026 -0700

    fix(amber): emit Python operator state outputs reliably (#4560)
    
    ### What changes were proposed in this PR?
    
    Restores reliable state-output emission for Python operators after the
    #4552 revert. After this PR, both per-input-state outputs
    (`Operator.process_state(...)`) and the end-of-input-port output
    (`Operator.produce_state_on_finish(...)`) reach downstream channels.
    
    `MainLoop.process_input_state` previously did two `_switch_context()`
    calls with the read of `current_output_state` in between. The executor
    only writes that field during the *second* switch — so `MainLoop` always
    captured the previous cycle's value, and the finish-state set on
    `EndChannel` ended up in `current_output_state` after `MainLoop` had
    returned, never to be read again. This PR collapses the read to a single
    switch + read-after, drops the duplicate post-init and end-of-body
    switches in `DataProcessor.run`, and makes the run-loop's input dispatch
    peek-then-consume so `current_internal_marker` keeps the atomic
    single-consume semantics whose absence was the root cause of #4545.
    
    <details>
    <summary>History — third attempt at this fix</summary>
    
    - #4421 reported that a Python operator could process its first state
    input but not its second.
    - PR #4424 added three `_switch_context()` calls to keep `MainLoop` and
    `DataProcessor` in sync, closed #4421, but changed
    `current_internal_marker` lifetime and broke the source-propagation case
    in `ReconfigurationSpec` (#4545).
    - PR #4547 tried to restore atomic marker consumption on top of #4424
    and re-enabled the source-propagation case in `ReconfigurationSpec`. CI
    continued to fail.
    - PR #4552 reverted #4424 outright as a stop-gap. State-processing is
    back to its pre-#4424 broken state — see #4559.
    
    </details>
    
    ### Any related issues, documentation, discussions?
    
    Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552.
    
    ### How was this PR tested?
    
    Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added
    three new tests:
    
    - `test_process_state_can_emit_multiple_states` — stub-level coverage of
    the #4421 "second state not processed" scenario.
    - `test_main_loop_thread_can_process_state` — full real-thread coverage
    of state DataElements and `produce_state_on_finish` on `EndChannel`.
    Times out on plain `main` (#4559); passes on this branch.
    - `test_main_loop_thread_can_process_state_after_tuple` — coverage for
    the mixed `tuple → state` input sequence.
    
    `ReconfigurationSpec`'s source-propagation case (re-enabled in #4547)
    should be re-run on this branch to confirm the new handshake does not
    re-introduce #4545.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Anthropic Claude Opus 4.7
---
 .../main/python/core/runnables/data_processor.py   |  42 ++-
 amber/src/main/python/core/runnables/main_loop.py  |   1 -
 .../main/python/core/runnables/test_main_loop.py   | 290 +++++++++++++++++++++
 3 files changed, 320 insertions(+), 13 deletions(-)

diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 4399b1a3a2..35d2a75d1d 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -49,20 +49,30 @@ class DataProcessor(Runnable, Stoppable):
         with self._context.tuple_processing_manager.context_switch_condition:
             
self._context.tuple_processing_manager.context_switch_condition.wait()
         self._running.set()
-        self._switch_context()
+        self._pre_loop_checks()
         while self._running.is_set():
-            marker = 
self._context.tuple_processing_manager.get_internal_marker()
-            state = self._context.state_processing_manager.get_input_state()
-            tuple_ = self._context.tuple_processing_manager.current_input_tuple
-            if marker is not None:
-                self.process_internal_marker(marker)
-            elif state is not None:
-                self.process_state(state)
-            elif tuple_ is not None:
-                self.process_tuple()
+            tpm = self._context.tuple_processing_manager
+            spm = self._context.state_processing_manager
+            has_marker = tpm.current_internal_marker is not None
+            has_state = spm.current_input_state is not None
+            has_tuple = tpm.current_input_tuple is not None
+            queued = has_marker + has_state + has_tuple
+            # MainLoop is single-threaded and sets at most one of
+            # current_internal_marker / current_input_state /
+            # current_input_tuple per cycle before switching to here, so
+            # exactly one slot must be populated on every iteration.
+            if queued != 1:
+                raise RuntimeError(
+                    "DataProcessor expected exactly one queued input per "
+                    f"iteration, got marker={has_marker}, state={has_state}, "
+                    f"tuple={has_tuple}"
+                )
+            if has_marker:
+                self.process_internal_marker(tpm.get_internal_marker())
+            elif has_state:
+                self.process_state(spm.get_input_state())
             else:
-                raise RuntimeError("No marker or tuple to process.")
-            self._switch_context()
+                self.process_tuple()
 
     def process_internal_marker(self, internal_marker: InternalMarker) -> None:
         try:
@@ -182,6 +192,14 @@ class DataProcessor(Runnable, Stoppable):
     def _post_switch_context_checks(self):
         self._check_and_process_debug_command()
 
+    def _pre_loop_checks(self) -> None:
+        # Runs once after init and before the first task so that a debug
+        # command queued during worker setup fires before any
+        # tuple / state / marker is processed. Only the debug-command
+        # check is needed here -- no task has run yet, so there is no
+        # exception to surface.
+        self._check_and_process_debug_command()
+
     def _report_exception(self, exc_info: ExceptionInfo):
         tb = traceback.extract_tb(exc_info[2])
         filename, line_number, func_name, text = tb[-1]
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index cde2847206..ab35cda81b 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -192,7 +192,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
     def process_input_state(self) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
-        self._switch_context()
         if output_state is not None:
             for to, batch in 
self.context.output_manager.emit_state(output_state):
                 self._output_queue.put(
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5612e4b41a..cc6969d964 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -26,6 +26,8 @@ from threading import Thread
 from core.models import (
     DataFrame,
     InternalQueue,
+    State,
+    StateFrame,
     Tuple,
 )
 from core.models.internal_queue import (
@@ -160,6 +162,57 @@ class TestMainLoop:
             ),
         )
 
+    @pytest.fixture
+    def mock_state_data_elements(self, mock_data_input_channel):
+        elements = []
+        for value in (1, 2, 3, 4):
+            state = State()
+            state.add("value", value)
+            elements.append(
+                DataElement(
+                    tag=mock_data_input_channel,
+                    payload=StateFrame(frame=state),
+                )
+            )
+        return elements
+
+    @pytest.fixture
+    def state_processing_executor(self):
+        # In-process executor for the state-pipeline tests. Tags processed
+        # states with `processed_marker` and emits a finish-marker state
+        # from `produce_state_on_finish` so EndChannel handling can be
+        # observed.
+        class StateProcessingExecutor:
+            @staticmethod
+            def process_tuple(tuple_, port):
+                yield tuple_
+
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                new_state = State()
+                for key, value in state.__dict__.items():
+                    if key != "schema":
+                        new_state.add(key, value)
+                new_state.add("processed_marker", "executed")
+                new_state.add("port", port)
+                return new_state
+
+            @staticmethod
+            def produce_state_on_finish(port: int) -> State:
+                finish_state = State()
+                finish_state.add("finish_marker", 
"produce_state_on_finish_ran")
+                return finish_state
+
+            @staticmethod
+            def on_finish(port):
+                yield
+
+            @staticmethod
+            def close():
+                pass
+
+        return StateProcessingExecutor()
+
     @pytest.fixture
     def mock_binary_data_element(self, mock_binary_tuple, 
mock_data_input_channel):
         return DataElement(
@@ -1231,3 +1284,240 @@ class TestMainLoop:
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
         reraise()
+
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # Stub-level coverage of the single-switch state handshake. Each
+        # call to the (stubbed) _switch_context simulates DataProc
+        # consuming the queued input state and writing
+        # current_output_state, mirroring what real DataProc.process_state
+        # does between MainLoop's switches.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state.add("value", state["value"] + 1)
+                output_state.add("port", port)
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = State()
+        first_state.add("value", 1)
+        second_state = State()
+        second_state.add("value", 41)
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
+    @pytest.mark.timeout(2)
+    def test_main_loop_thread_can_process_state(
+        self,
+        mock_data_output_channel,
+        mock_control_output_channel,
+        input_queue,
+        output_queue,
+        main_loop,
+        main_loop_thread,
+        mock_assign_input_port,
+        mock_assign_output_port,
+        mock_add_input_channel,
+        mock_add_partitioning,
+        mock_initialize_executor,
+        mock_state_data_elements,
+        mock_end_of_upstream,
+        state_processing_executor,
+        command_sequence,
+        reraise,
+    ):
+        # End-to-end coverage of the state-processing path through the real
+        # MainLoop + DataProcessor threads. The single-switch state handshake
+        # in MainLoop.process_input_state means each state is emitted in its
+        # own cycle (no lag), and an EndChannel ECM after the last state
+        # produces an additional output via produce_state_on_finish.
+        main_loop_thread.start()
+
+        for setup_msg in [
+            mock_assign_input_port,
+            mock_assign_output_port,
+            mock_add_input_channel,
+            mock_add_partitioning,
+            mock_initialize_executor,
+        ]:
+            input_queue.put(setup_msg)
+            assert output_queue.get() == DCMElement(
+                tag=mock_control_output_channel,
+                payload=DirectControlMessagePayloadV2(
+                    return_invocation=ReturnInvocation(
+                        command_id=command_sequence,
+                        return_value=ControlReturn(empty_return=EmptyReturn()),
+                    )
+                ),
+            )
+
+        # Going through the InitializeExecutor RPC above sets up the rest of
+        # the worker state (output schema, partitioning bookkeeping). Swap
+        # the executor instance with the test helper here so the test can
+        # assert the executor's process_state and produce_state_on_finish
+        # actually ran, without depending on Python's cross-test module
+        # caching for operator classes loaded via OpExecWithCode.
+        main_loop.context.executor_manager.executor = state_processing_executor
+
+        # Send four states. With the lag-free state pipeline we expect each
+        # state to produce its own output in order.
+        for state_element in mock_state_data_elements:
+            input_queue.put(state_element)
+
+        for expected_value in (1, 2, 3, 4):
+            output_data_element: DataElement = output_queue.get()
+            assert output_data_element.tag == mock_data_output_channel
+            assert isinstance(output_data_element.payload, StateFrame), (
+                f"expected StateFrame for value={expected_value}, got "
+                f"{type(output_data_element.payload).__name__}"
+            )
+            output_state = output_data_element.payload.frame
+            assert output_state["value"] == expected_value, (
+                f"state outputs arrived out of order: expected value="
+                f"{expected_value}, got value={output_state['value']}"
+            )
+            assert output_state["processed_marker"] == "executed"
+            assert output_state["port"] == 0
+
+        # Send EndChannel to drive _process_end_channel. The executor's
+        # produce_state_on_finish writes a finish-marker state into
+        # current_output_state inside DataProc's process_internal_marker;
+        # MainLoop's process_input_state then emits it.
+        input_queue.put(mock_end_of_upstream)
+
+        # Drain the control reply messages so the next data
+        # output_queue.get() returns the post-EndChannel data emission.
+        output_queue.disable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+        for _ in range(3):
+            control_reply = output_queue.get()
+            assert isinstance(control_reply, DCMElement), (
+                f"expected DCMElement during EndChannel teardown, got "
+                f"{type(control_reply).__name__}"
+            )
+        output_queue.enable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+
+        end_channel_state_output: DataElement = output_queue.get()
+        assert end_channel_state_output.tag == mock_data_output_channel
+        assert isinstance(end_channel_state_output.payload, StateFrame), (
+            f"expected StateFrame for the EndChannel-driven emission, got "
+            f"{type(end_channel_state_output.payload).__name__}"
+        )
+        end_channel_state = end_channel_state_output.payload.frame
+        assert "finish_marker" in end_channel_state.__dict__, (
+            f"EndChannel emission should be the finish-marker state from "
+            f"produce_state_on_finish, got {end_channel_state!r}"
+        )
+        assert end_channel_state["finish_marker"] == 
"produce_state_on_finish_ran"
+
+        reraise()
+
+    @pytest.mark.timeout(2)
+    def test_main_loop_thread_can_process_state_after_tuple(
+        self,
+        mock_data_output_channel,
+        mock_control_output_channel,
+        input_queue,
+        output_queue,
+        main_loop,
+        main_loop_thread,
+        mock_assign_input_port,
+        mock_assign_output_port,
+        mock_add_input_channel,
+        mock_add_partitioning,
+        mock_initialize_executor,
+        mock_data_element,
+        mock_state_data_elements,
+        state_processing_executor,
+        command_sequence,
+        reraise,
+    ):
+        # Coverage for the mixed (tuple, then state) input sequence: a
+        # tuple followed by several state DataElements should still emit
+        # every state's processed output in order.
+        main_loop_thread.start()
+
+        for setup_msg in [
+            mock_assign_input_port,
+            mock_assign_output_port,
+            mock_add_input_channel,
+            mock_add_partitioning,
+            mock_initialize_executor,
+        ]:
+            input_queue.put(setup_msg)
+            assert output_queue.get() == DCMElement(
+                tag=mock_control_output_channel,
+                payload=DirectControlMessagePayloadV2(
+                    return_invocation=ReturnInvocation(
+                        command_id=command_sequence,
+                        return_value=ControlReturn(empty_return=EmptyReturn()),
+                    )
+                ),
+            )
+
+        main_loop.context.executor_manager.executor = state_processing_executor
+
+        # Tuple first, then four states.
+        input_queue.put(mock_data_element)
+        warmup_output: DataElement = output_queue.get()
+        assert warmup_output.tag == mock_data_output_channel
+        assert isinstance(warmup_output.payload, DataFrame)
+
+        for state_element in mock_state_data_elements:
+            input_queue.put(state_element)
+
+        for expected_value in (1, 2, 3, 4):
+            output_data_element: DataElement = output_queue.get()
+            assert output_data_element.tag == mock_data_output_channel
+            assert isinstance(output_data_element.payload, StateFrame), (
+                f"expected StateFrame for value={expected_value}, got "
+                f"{type(output_data_element.payload).__name__}"
+            )
+            output_state = output_data_element.payload.frame
+            assert output_state["value"] == expected_value, (
+                f"state outputs after a tuple arrived out of order: "
+                f"expected value={expected_value}, "
+                f"got value={output_state['value']}"
+            )
+            assert output_state["processed_marker"] == "executed"
+
+        reraise()

Reply via email to