This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 43f276e897 refactor(pyamber): collapse DataProcessor per-call
boilerplate (#4685)
43f276e897 is described below
commit 43f276e89721a97f6bc28f037fb3b73d15dc1544
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 5 07:38:57 2026 -0700
refactor(pyamber): collapse DataProcessor per-call boilerplate (#4685)
### What changes were proposed in this PR?
Cleanup-only refactor of `core/runnables/data_processor.py`: collapse
the repeated `try / except / finally` + `replace_print` boilerplate from
`process_internal_marker`, `process_state`, and `process_tuple` into one
`_executor_session` `@contextmanager` that yields `(executor, port_id)`.
No behavior change.
### Any related issues, documentation, discussions?
Closes #4684.
### How was this PR tested?
Unit tests added in `test_data_processor.py` and
`test_main_loop_exception_ordering.py`
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (Claude Code)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../main/python/core/runnables/data_processor.py | 103 ++++------
.../python/core/runnables/test_data_processor.py | 207 +++++++++++++++++++++
.../main/python/core/runnables/test_main_loop.py | 57 ++++++
3 files changed, 302 insertions(+), 65 deletions(-)
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 276a1669f5..089a162228 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -18,6 +18,7 @@
import os
import sys
import traceback
+from contextlib import contextmanager
from loguru import logger
from threading import Event
from typing import Iterator, Optional
@@ -49,19 +50,18 @@ class DataProcessor(Runnable, Stoppable):
with self._context.tuple_processing_manager.context_switch_condition:
self._context.tuple_processing_manager.context_switch_condition.wait()
self._running.set()
- self._pre_loop_checks()
+ self._check_and_process_debug_command()
while self._running.is_set():
tpm = self._context.tuple_processing_manager
spm = self._context.state_processing_manager
has_marker = tpm.current_internal_marker is not None
has_state = spm.current_input_state is not None
has_tuple = tpm.current_input_tuple is not None
- queued = has_marker + has_state + has_tuple
# MainLoop is single-threaded and sets at most one of
# current_internal_marker / current_input_state /
# current_input_tuple per cycle before switching to here, so
# exactly one slot must be populated on every iteration.
- if queued != 1:
+ if has_marker + has_state + has_tuple != 1:
raise RuntimeError(
"DataProcessor expected exactly one queued input per "
f"iteration, got marker={has_marker}, state={has_state}, "
@@ -75,34 +75,45 @@ class DataProcessor(Runnable, Stoppable):
self.process_tuple()
def process_internal_marker(self, internal_marker: InternalMarker) -> None:
- try:
- executor = self._context.executor_manager.executor
- port_id =
self._context.tuple_processing_manager.get_input_port_id()
- with replace_print(
- self._context.worker_id,
- self._context.console_message_manager.print_buf,
- ):
- if isinstance(internal_marker, StartChannel):
-
self._set_output_state(executor.produce_state_on_start(port_id))
- elif isinstance(internal_marker, EndChannel):
-
self._set_output_state(executor.produce_state_on_finish(port_id))
- self._switch_context()
- self._set_output_tuple(executor.on_finish(port_id))
-
- except Exception as err:
- logger.exception(err)
- exc_info = sys.exc_info()
- self._context.exception_manager.set_exception_info(exc_info)
- self._report_exception(exc_info)
-
- finally:
- self._switch_context()
+ with self._executor_session() as (executor, port_id):
+ if isinstance(internal_marker, StartChannel):
+
self._set_output_state(executor.produce_state_on_start(port_id))
+ elif isinstance(internal_marker, EndChannel):
+
self._set_output_state(executor.produce_state_on_finish(port_id))
+ # Flush the state to MainLoop before producing tuples so the
+ # state and the tuple stream don't share a single switch.
+ self._switch_context()
+ self._set_output_tuple(executor.on_finish(port_id))
def process_state(self, state: State) -> None:
"""
Process an input marker by invoking appropriate state
or tuple generation based on the marker type.
"""
+ with self._executor_session() as (executor, port_id):
+ self._set_output_state(executor.process_state(state, port_id))
+
+ def process_tuple(self) -> None:
+ """
+ Process an input tuple by invoking the executor's tuple processing
method.
+ """
+ finished_current =
self._context.tuple_processing_manager.finished_current
+ while not finished_current.is_set():
+ with self._executor_session() as (executor, port_id):
+ tuple_ =
self._context.tuple_processing_manager.get_input_tuple()
+ self._set_output_tuple(executor.process_tuple(tuple_, port_id))
+
+ @contextmanager
+ def _executor_session(self):
+ """
+ Open one executor invocation: hand back (executor, port_id) under a
+ print-capture session, route any exception to the exception manager
+ and queue the stack trace as a console message, and always switch
+ back to MainLoop on exit. Reporting must happen *before* the
+ switch: MainLoop's post-switch hook flushes console messages and
+ then enters EXCEPTION_PAUSE, so anything queued after the switch
+ would arrive at the controller only after the worker resumes.
+ """
try:
executor = self._context.executor_manager.executor
port_id =
self._context.tuple_processing_manager.get_input_port_id()
@@ -110,42 +121,15 @@ class DataProcessor(Runnable, Stoppable):
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
- self._set_output_state(executor.process_state(state, port_id))
-
+ yield executor, port_id
except Exception as err:
logger.exception(err)
exc_info = sys.exc_info()
self._context.exception_manager.set_exception_info(exc_info)
self._report_exception(exc_info)
-
finally:
self._switch_context()
- def process_tuple(self) -> None:
- """
- Process an input tuple by invoking the executor's tuple processing
method.
- """
- finished_current =
self._context.tuple_processing_manager.finished_current
- while not finished_current.is_set():
- try:
- executor = self._context.executor_manager.executor
- port_id =
self._context.tuple_processing_manager.get_input_port_id()
- tuple_ =
self._context.tuple_processing_manager.get_input_tuple()
- with replace_print(
- self._context.worker_id,
- self._context.console_message_manager.print_buf,
- ):
- self._set_output_tuple(executor.process_tuple(tuple_,
port_id))
-
- except Exception as err:
- logger.exception(err)
- exc_info = sys.exc_info()
- self._context.exception_manager.set_exception_info(exc_info)
- self._report_exception(exc_info)
-
- finally:
- self._switch_context()
-
def _set_output_tuple(self, output_iterator:
Iterator[Optional[TupleLike]]) -> None:
"""
Set the output tuple after processing by the executor.
@@ -179,7 +163,7 @@ class DataProcessor(Runnable, Stoppable):
with self._context.tuple_processing_manager.context_switch_condition:
self._context.tuple_processing_manager.context_switch_condition.notify()
self._context.tuple_processing_manager.context_switch_condition.wait()
- self._post_switch_context_checks()
+ self._check_and_process_debug_command()
def _check_and_process_debug_command(self) -> None:
"""
@@ -191,17 +175,6 @@ class DataProcessor(Runnable, Stoppable):
# This line has no side effects on the current debugger state.
self._context.debug_manager.debugger.set_trace()
- def _post_switch_context_checks(self):
- self._check_and_process_debug_command()
-
- def _pre_loop_checks(self) -> None:
- # Runs once after init and before the first task so that a debug
- # command queued during worker setup fires before any
- # tuple / state / marker is processed. Only the debug-command
- # check is needed here -- no task has run yet, so there is no
- # exception to surface.
- self._check_and_process_debug_command()
-
def _report_exception(self, exc_info: ExceptionInfo):
tb = traceback.extract_tb(exc_info[2])
filename, line_number, func_name, text = tb[-1]
diff --git a/amber/src/main/python/core/runnables/test_data_processor.py
b/amber/src/main/python/core/runnables/test_data_processor.py
new file mode 100644
index 0000000000..61cc5bf7e9
--- /dev/null
+++ b/amber/src/main/python/core/runnables/test_data_processor.py
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from core.architecture.managers import Context
+from core.models import State
+from core.models.internal_queue import InternalQueue
+from core.models.internal_marker import EndChannel, StartChannel
+from core.runnables.data_processor import DataProcessor
+from proto.org.apache.texera.amber.engine.architecture.rpc import
ConsoleMessageType
+
+
[email protected]
+def context():
+ return Context(worker_id="test-worker", input_queue=InternalQueue())
+
+
[email protected]
+def data_processor(context, monkeypatch):
+ """
+ DataProcessor with `_switch_context` swapped for a counter so each test
+ can drive the synchronous parts of the per-call boilerplate without
+ blocking on the cross-thread handshake.
+ """
+ dp = DataProcessor(context)
+ dp.switch_calls = 0
+
+ def fake_switch():
+ dp.switch_calls += 1
+
+ monkeypatch.setattr(dp, "_switch_context", fake_switch)
+ return dp
+
+
+class _StubExecutor:
+ """
+ Records what `process_internal_marker` invokes on it so the test can
+ assert the StartChannel / EndChannel branches of `data_processor`
+ without standing up a real Operator.
+ """
+
+ def __init__(self):
+ self.calls = []
+
+ def produce_state_on_start(self, port_id):
+ self.calls.append(("produce_state_on_start", port_id))
+ return {"phase": "start"}
+
+ def produce_state_on_finish(self, port_id):
+ self.calls.append(("produce_state_on_finish", port_id))
+ return {"phase": "finish"}
+
+ def on_finish(self, port_id):
+ self.calls.append(("on_finish", port_id))
+ return iter([])
+
+
+class TestProcessInternalMarker:
+ @pytest.mark.timeout(2)
+ def test_start_channel_invokes_produce_state_on_start(
+ self, context, data_processor
+ ):
+ executor = _StubExecutor()
+ context.executor_manager.executor = executor
+
+ data_processor.process_internal_marker(StartChannel())
+
+ # StartChannel routes to produce_state_on_start with the current
+ # input port id (0 when no upstream is set), and the returned dict
+ # is wrapped into a State on the output slot.
+ assert executor.calls == [("produce_state_on_start", 0)]
+ out = context.state_processing_manager.current_output_state
+ assert isinstance(out, State)
+ assert out["phase"] == "start"
+ # `_executor_session` always switches once on exit.
+ assert data_processor.switch_calls == 1
+
+ @pytest.mark.timeout(2)
+ def test_end_channel_flushes_state_then_drains_on_finish(
+ self, context, data_processor
+ ):
+ executor = _StubExecutor()
+ context.executor_manager.executor = executor
+
+ data_processor.process_internal_marker(EndChannel())
+
+ # EndChannel must call produce_state_on_finish first, switch
+ # context to flush that state separately from the on_finish
+ # tuple stream, then drain on_finish. The session itself adds
+ # its own trailing switch on exit.
+ assert executor.calls == [
+ ("produce_state_on_finish", 0),
+ ("on_finish", 0),
+ ]
+ # 1 switch from the explicit flush + 1 from `_executor_session`
+ # exit. `_set_output_tuple` exits early on an empty iterator and
+ # does not switch.
+ assert data_processor.switch_calls == 2
+
+
+class TestExecutorSession:
+ @pytest.mark.timeout(2)
+ def test_exception_inside_session_is_reported_before_the_switch(
+ self, context, data_processor
+ ):
+ # Order matters: MainLoop's _check_exception flushes pending
+ # console messages and then immediately enters EXCEPTION_PAUSE,
+ # so the stack trace must already be in the buffer at the moment
+ # _executor_session calls _switch_context. Capture the buffer
+ # state from inside the fake switch to pin that ordering.
+ seen_at_switch = []
+
+ def capturing_switch():
+ seen_at_switch.extend(
+ context.console_message_manager.get_messages(force_flush=True)
+ )
+ data_processor.switch_calls += 1
+
+ data_processor._switch_context = capturing_switch
+
+ with data_processor._executor_session() as session:
+ assert session is not None
+ raise RuntimeError("boom-from-executor")
+
+ # Exception was routed into the manager so MainLoop's
+ # _check_exception can see it.
+ assert context.exception_manager.has_exception()
+ exc_info = context.exception_manager.get_exc_info()
+ assert exc_info[0] is RuntimeError
+ assert "boom-from-executor" in str(exc_info[1])
+ # And the stack-trace console message was queued *before* the
+ # finally-clause switch — without this, the worker would pause
+ # before ever sending the error to the controller.
+ assert len(seen_at_switch) == 1
+ msg = seen_at_switch[0]
+ assert msg.worker_id == "test-worker"
+ assert msg.msg_type == ConsoleMessageType.ERROR
+ assert "RuntimeError: boom-from-executor" in msg.title
+ # Exit always switches back to MainLoop, even on the failure path.
+ assert data_processor.switch_calls == 1
+
+ @pytest.mark.timeout(2)
+ def test_clean_session_does_not_record_an_exception(self, context,
data_processor):
+ with data_processor._executor_session():
+ pass
+
+ assert not context.exception_manager.has_exception()
+ assert (
+
list(context.console_message_manager.get_messages(force_flush=True)) == []
+ )
+ # Even on the success path, the finally clause yields control
+ # back to MainLoop exactly once.
+ assert data_processor.switch_calls == 1
+
+
+class TestRunInvariant:
+ """
+ `run()` enforces that exactly one of marker / state / tuple is queued per
+ iteration. The invariant raises a RuntimeError otherwise — that branch
+ is otherwise unreachable in the integration tests, so cover it directly.
+ """
+
+ @staticmethod
+ def _drive_run_synchronously(context, monkeypatch) -> DataProcessor:
+ # `run()` opens with a condition.wait() so MainLoop can hand off
+ # control. Stub that out so the test thread can call run() directly
+ # and reach the invariant check on the very first iteration without
+ # any cross-thread coordination.
+ cond = context.tuple_processing_manager.context_switch_condition
+ monkeypatch.setattr(cond, "wait", lambda *a, **kw: None)
+ return DataProcessor(context)
+
+ @pytest.mark.timeout(2)
+ def test_zero_queued_inputs_raises_invariant_error(self, context,
monkeypatch):
+ dp = self._drive_run_synchronously(context, monkeypatch)
+ # Nothing is set on tpm/spm — has_marker + has_state + has_tuple == 0.
+ with pytest.raises(RuntimeError) as excinfo:
+ dp.run()
+ assert "expected exactly one queued input" in str(excinfo.value)
+ assert "marker=False, state=False, tuple=False" in str(excinfo.value)
+
+ @pytest.mark.timeout(2)
+ def test_two_queued_inputs_raises_invariant_error(self, context,
monkeypatch):
+ dp = self._drive_run_synchronously(context, monkeypatch)
+ # Populate two slots — has_marker + has_tuple == 2.
+ context.tuple_processing_manager.current_internal_marker =
StartChannel()
+ context.tuple_processing_manager.current_input_tuple = ("payload",)
+ with pytest.raises(RuntimeError) as excinfo:
+ dp.run()
+ assert "expected exactly one queued input" in str(excinfo.value)
+ assert "marker=True" in str(excinfo.value)
+ assert "tuple=True" in str(excinfo.value)
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index c9daa633f5..400a7f2a90 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -20,6 +20,7 @@ import pandas
import pickle
import pyarrow
import pytest
+import sys
import time
from threading import Thread
@@ -48,6 +49,8 @@ from proto.org.apache.texera.amber.core import (
OpExecInitInfo,
EmbeddedControlMessageIdentity,
)
+from core.architecture.managers.pause_manager import PauseType
+from core.util.console_message.timestamp import current_time_in_local_timezone
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ControlRequest,
AssignPortRequest,
@@ -65,6 +68,8 @@ from proto.org.apache.texera.amber.engine.architecture.rpc
import (
WorkerStateResponse,
EmbeddedControlMessageType,
EmbeddedControlMessage,
+ ConsoleMessage,
+ ConsoleMessageType,
)
from proto.org.apache.texera.amber.engine.architecture.sendsemantics import (
OneToOnePartitioning,
@@ -1563,3 +1568,55 @@ class TestMainLoop:
assert output_state["processed_marker"] == "executed"
reraise()
+
+ @pytest.mark.timeout(2)
+ def test_console_message_rpc_fires_before_exception_pause(
+ self, main_loop, monkeypatch
+ ):
+ # Pin the controller-facing contract: when DataProcessor raises
+ # during an executor call, the stack-trace ConsoleMessage must
+ # reach the controller *before* the worker enters EXCEPTION_PAUSE
+ # — otherwise the UI sees a paused worker with no error to show
+ # until the user resumes. The DataProcessor side queues the
+ # message before the switch (covered by
+ # test_data_processor.TestExecutorSession); this test pins the
+ # MainLoop side: post-switch hook flushes RPCs first, pauses last.
+ events = []
+
+ monkeypatch.setattr(
+ main_loop,
+ "_send_console_message",
+ lambda msg: events.append(("rpc", msg)),
+ )
+ monkeypatch.setattr(
+ main_loop.context.pause_manager,
+ "pause",
+ lambda pause_type, change_state=True: events.append(("pause",
pause_type)),
+ )
+
+ try:
+ raise RuntimeError("boom-from-executor")
+ except RuntimeError:
+ exc_info = sys.exc_info()
+ main_loop.context.exception_manager.set_exception_info(exc_info)
+ main_loop.context.console_message_manager.put_message(
+ ConsoleMessage(
+ worker_id="dummy_worker_id",
+ timestamp=current_time_in_local_timezone(),
+ msg_type=ConsoleMessageType.ERROR,
+ source="test:_capture_exc_info:0",
+ title="RuntimeError: boom-from-executor",
+ message="RuntimeError: boom-from-executor",
+ )
+ )
+
+ main_loop._post_switch_context_checks()
+
+ kinds = [e[0] for e in events]
+ assert kinds == ["rpc", "pause"], (
+ "console message must reach controller before pause; "
+ f"observed order: {kinds}"
+ )
+ assert events[0][1].msg_type == ConsoleMessageType.ERROR
+ assert "boom-from-executor" in events[0][1].title
+ assert events[1][1] is PauseType.EXCEPTION_PAUSE