This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5783-44df4f77013208248c0e5e59d7d7b505ec04ad56 in repository https://gitbox.apache.org/repos/asf/texera.git
commit a3c9e559755a4e80e9d92cb1c6eb5f71e4e0a442 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Jun 21 22:34:23 2026 -0700 refactor(amber): centralize uncaught-exception console reporting (#5783) ### What changes were proposed in this PR? `DataProcessor` built the operator-facing ERROR console message for an uncaught UDF exception inline (`_report_exception`). This moves that construction into a small factory — `core.util.console_message.error_message.create_error_console_message(worker_id, exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the factory and queues it through the existing `ConsoleMessageManager.put_message`. Per review, `ConsoleMessageManager` stays purely about **message management** (its interfaces are all at the `msg: ConsoleMessage` level); message *construction* lives in the util. Behavior-preserving: the same ERROR `ConsoleMessage` is produced — | field | value | |---|---| | `msg_type` | `ConsoleMessageType.ERROR` | | `title` | the exception's final line (e.g. `ValueError: ...`) | | `message` | the full formatted traceback | | `source` | `module:func:line` of the raising frame | Centralizing the factory lets other uncaught-exception paths report identically (the loop operators' main-loop condition evaluation reuses it in a follow-up). ### Any related issues, documentation, discussions? Split out of #5700 (loop operators) to keep that PR focused; the refactor is independent and behavior-preserving on `main`. ### How was this PR tested? - New `test_error_message.py::test_builds_error_console_message_from_exc_info` pins the factory output (worker id, ERROR type, title, traceback body, `module:func:line` source) — written test-first. - The existing `test_data_processor.py` (asserts console messages after a UDF raises) still passes unchanged, confirming the delegation preserves behavior. - `cd amber && pytest -m "not integration"` on the affected files: 12 passed; `black --check` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../main/python/core/runnables/data_processor.py | 34 +------ .../core/util/console_message/error_message.py | 61 ++++++++++++ .../util/console_message/test_error_message.py | 102 +++++++++++++++++++++ 3 files changed, 168 insertions(+), 29 deletions(-) diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 3998a3ff9a..cf792fd681 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -15,26 +15,20 @@ # specific language governing permissions and limitations # under the License. -import os import sys -import traceback from contextlib import contextmanager from loguru import logger from threading import Event from typing import Iterator, Optional from core.architecture.managers import Context -from core.models import ExceptionInfo, State, TupleLike, InternalMarker +from core.models import State, TupleLike, InternalMarker from core.models.internal_marker import StartChannel, EndChannel from core.models.table import all_output_to_tuple from core.util import Stoppable +from core.util.console_message.error_message import create_error_console_message from core.util.console_message.replace_print import replace_print -from core.util.console_message.timestamp import current_time_in_local_timezone from core.util.runnable import Runnable -from proto.org.apache.texera.amber.engine.architecture.rpc import ( - ConsoleMessage, - ConsoleMessageType, -) class DataProcessor(Runnable, Stoppable): @@ -126,7 +120,9 @@ class DataProcessor(Runnable, Stoppable): logger.exception(err) exc_info = sys.exc_info() self._context.exception_manager.set_exception_info(exc_info) - self._report_exception(exc_info) + self._context.console_message_manager.put_message( + create_error_console_message(self._context.worker_id, exc_info) + ) finally: self._switch_context() @@ -175,25 +171,5 @@ class DataProcessor(Runnable, Stoppable): # This line has no side effects on the current debugger state. self._context.debug_manager.debugger.set_trace() - def _report_exception(self, exc_info: ExceptionInfo): - tb = traceback.extract_tb(exc_info[2]) - filename, line_number, func_name, text = tb[-1] - base_name = os.path.basename(filename) - module_name, _ = os.path.splitext(base_name) - formatted_exception = traceback.format_exception(*exc_info) - title: str = formatted_exception[-1].strip() - message: str = "\n".join(formatted_exception) - - self._context.console_message_manager.put_message( - ConsoleMessage( - worker_id=self._context.worker_id, - timestamp=current_time_in_local_timezone(), - msg_type=ConsoleMessageType.ERROR, - source=f"{module_name}:{func_name}:{line_number}", - title=title, - message=message, - ) - ) - def stop(self): self._running.clear() diff --git a/amber/src/main/python/core/util/console_message/error_message.py b/amber/src/main/python/core/util/console_message/error_message.py new file mode 100644 index 0000000000..417d389999 --- /dev/null +++ b/amber/src/main/python/core/util/console_message/error_message.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import traceback + +from core.models import ExceptionInfo +from core.util.console_message.timestamp import current_time_in_local_timezone +from proto.org.apache.texera.amber.engine.architecture.rpc import ( + ConsoleMessage, + ConsoleMessageType, +) + + +def create_error_console_message( + worker_id: str, exc_info: ExceptionInfo +) -> ConsoleMessage: + """Build an ERROR ``ConsoleMessage`` describing ``exc_info``. + + Produces the operator-facing error message for an uncaught exception, + whether it surfaced from a UDF on the data path (DataProcessor) or from a + user expression evaluated on the main loop thread. Sharing this factory + keeps every uncaught-exception path reporting identically; callers are + responsible for recording the exception with the exception manager, + queueing the returned message, and flushing/pausing as appropriate. + """ + tb = traceback.extract_tb(exc_info[2]) + if tb: + filename, line_number, func_name, _ = tb[-1] + module_name, _ = os.path.splitext(os.path.basename(filename)) + source = f"{module_name}:{func_name}:{line_number}" + else: + # No traceback frames (e.g. an exception object that was never raised). + # Still report it -- an error reporter must not itself throw. + source = "" + formatted_exception = traceback.format_exception(*exc_info) + title: str = formatted_exception[-1].strip() + message: str = "\n".join(formatted_exception) + + return ConsoleMessage( + worker_id=worker_id, + timestamp=current_time_in_local_timezone(), + msg_type=ConsoleMessageType.ERROR, + source=source, + title=title, + message=message, + ) diff --git a/amber/src/test/python/core/util/console_message/test_error_message.py b/amber/src/test/python/core/util/console_message/test_error_message.py new file mode 100644 index 0000000000..5c6bfeec61 --- /dev/null +++ b/amber/src/test/python/core/util/console_message/test_error_message.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys + +from core.util.console_message.error_message import create_error_console_message +from proto.org.apache.texera.amber.engine.architecture.rpc import ConsoleMessageType + + +class TestCreateErrorConsoleMessage: + def test_builds_error_console_message_from_exc_info(self): + # create_error_console_message turns an exc_info into a single ERROR + # ConsoleMessage: title is the exception's final line, message is the + # full formatted traceback, and source encodes the raising frame. + try: + raise ValueError("boom from udf") + except ValueError: + exc_info = sys.exc_info() + + msg = create_error_console_message("worker-7", exc_info) + + assert msg.worker_id == "worker-7" + assert msg.msg_type == ConsoleMessageType.ERROR + assert msg.title == "ValueError: boom from udf" + assert "Traceback (most recent call last)" in msg.message + assert "ValueError: boom from udf" in msg.message + # source encodes "<module>:<func>:<line>" of the raising frame + parts = msg.source.split(":") + assert len(parts) == 3 + assert parts[0] == "test_error_message" + assert parts[1] == "test_builds_error_console_message_from_exc_info" + + def test_source_points_to_the_deepest_raising_frame(self): + # source must encode the frame where the exception was RAISED (the + # deepest frame), not where it was caught. + def _inner(): + raise RuntimeError("deep failure") + + try: + _inner() + except RuntimeError: + exc_info = sys.exc_info() + + msg = create_error_console_message("w0", exc_info) + + parts = msg.source.split(":") + assert parts[0] == "test_error_message" + assert parts[1] == "_inner" # the raising frame, not the test method + + def test_chained_exception_reports_active_error_with_full_chain(self): + # A `raise ... from ...` chain: the title is the active (outer) error, + # and the message carries the whole chain including the connector. + try: + try: + raise ValueError("root cause") + except ValueError as cause: + raise RuntimeError("wrapping error") from cause + except RuntimeError: + exc_info = sys.exc_info() + + msg = create_error_console_message("w0", exc_info) + + assert msg.title == "RuntimeError: wrapping error" + assert "ValueError: root cause" in msg.message + assert "RuntimeError: wrapping error" in msg.message + assert "direct cause" in msg.message # chain connector text + + def test_exception_without_message_uses_bare_class_name(self): + try: + raise RuntimeError + except RuntimeError: + exc_info = sys.exc_info() + + msg = create_error_console_message("w0", exc_info) + + assert msg.msg_type == ConsoleMessageType.ERROR + assert msg.title == "RuntimeError" + + def test_missing_traceback_is_reported_without_crashing(self): + # An exception object that was never raised has no traceback. The + # reporter must still produce a message rather than throw. + exc_info = (ValueError, ValueError("never raised"), None) + + msg = create_error_console_message("w0", exc_info) + + assert msg.msg_type == ConsoleMessageType.ERROR + assert msg.title == "ValueError: never raised" + assert msg.source == ""
