This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5783-2e0dd7ca45e58ae7a036591cd04aa5b156b55103
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 3ced705c721574a06ce7031019fdbbb5ce22106e
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 21 23:36:55 2026 -0700

    refactor(amber): centralize uncaught-exception console reporting (#5783)
    
    ### What changes were proposed in this PR?
    
    `DataProcessor` built the operator-facing ERROR console message for an
    uncaught UDF exception inline (`_report_exception`). This moves that
    construction into a small factory —
    
`core.util.console_message.error_message.create_error_console_message(worker_id,
    exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the
    factory and queues it through the existing
    `ConsoleMessageManager.put_message`.
    
    Per review, `ConsoleMessageManager` stays purely about **message
    management** (its interfaces are all at the `msg: ConsoleMessage`
    level); message *construction* lives in the util.
    
    Behavior-preserving: the same ERROR `ConsoleMessage` is produced —
    
    | field | value |
    |---|---|
    | `msg_type` | `ConsoleMessageType.ERROR` |
    | `title` | the exception's final line (e.g. `ValueError: ...`) |
    | `message` | the full formatted traceback |
    | `source` | `module:func:line` of the raising frame |
    
    Centralizing the factory lets other uncaught-exception paths report
    identically (the loop operators' main-loop condition evaluation reuses
    it in a follow-up).
    
    ### Any related issues, documentation, discussions?
    
    Split out of #5700 (loop operators) to keep that PR focused; the
    refactor is independent and behavior-preserving on `main`.
    
    ### How was this PR tested?
    
    - New
    `test_error_message.py::test_builds_error_console_message_from_exc_info`
    pins the factory output (worker id, ERROR type, title, traceback body,
    `module:func:line` source) — written test-first.
    - The existing `test_data_processor.py` (asserts console messages after
    a UDF raises) still passes unchanged, confirming the delegation
    preserves behavior.
    - `cd amber && pytest -m "not integration"` on the affected files: 12
    passed; `black --check` clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../main/python/core/runnables/data_processor.py   |  34 +------
 .../core/util/console_message/error_message.py     |  61 ++++++++++++
 .../util/console_message/test_error_message.py     | 102 +++++++++++++++++++++
 3 files changed, 168 insertions(+), 29 deletions(-)

diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 3998a3ff9a..cf792fd681 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -15,26 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
 import sys
-import traceback
 from contextlib import contextmanager
 from loguru import logger
 from threading import Event
 from typing import Iterator, Optional
 
 from core.architecture.managers import Context
-from core.models import ExceptionInfo, State, TupleLike, InternalMarker
+from core.models import State, TupleLike, InternalMarker
 from core.models.internal_marker import StartChannel, EndChannel
 from core.models.table import all_output_to_tuple
 from core.util import Stoppable
+from core.util.console_message.error_message import 
create_error_console_message
 from core.util.console_message.replace_print import replace_print
-from core.util.console_message.timestamp import current_time_in_local_timezone
 from core.util.runnable import Runnable
-from proto.org.apache.texera.amber.engine.architecture.rpc import (
-    ConsoleMessage,
-    ConsoleMessageType,
-)
 
 
 class DataProcessor(Runnable, Stoppable):
@@ -126,7 +120,9 @@ class DataProcessor(Runnable, Stoppable):
             logger.exception(err)
             exc_info = sys.exc_info()
             self._context.exception_manager.set_exception_info(exc_info)
-            self._report_exception(exc_info)
+            self._context.console_message_manager.put_message(
+                create_error_console_message(self._context.worker_id, exc_info)
+            )
         finally:
             self._switch_context()
 
@@ -175,25 +171,5 @@ class DataProcessor(Runnable, Stoppable):
             # This line has no side effects on the current debugger state.
             self._context.debug_manager.debugger.set_trace()
 
-    def _report_exception(self, exc_info: ExceptionInfo):
-        tb = traceback.extract_tb(exc_info[2])
-        filename, line_number, func_name, text = tb[-1]
-        base_name = os.path.basename(filename)
-        module_name, _ = os.path.splitext(base_name)
-        formatted_exception = traceback.format_exception(*exc_info)
-        title: str = formatted_exception[-1].strip()
-        message: str = "\n".join(formatted_exception)
-
-        self._context.console_message_manager.put_message(
-            ConsoleMessage(
-                worker_id=self._context.worker_id,
-                timestamp=current_time_in_local_timezone(),
-                msg_type=ConsoleMessageType.ERROR,
-                source=f"{module_name}:{func_name}:{line_number}",
-                title=title,
-                message=message,
-            )
-        )
-
     def stop(self):
         self._running.clear()
diff --git a/amber/src/main/python/core/util/console_message/error_message.py 
b/amber/src/main/python/core/util/console_message/error_message.py
new file mode 100644
index 0000000000..417d389999
--- /dev/null
+++ b/amber/src/main/python/core/util/console_message/error_message.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import traceback
+
+from core.models import ExceptionInfo
+from core.util.console_message.timestamp import current_time_in_local_timezone
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def create_error_console_message(
+    worker_id: str, exc_info: ExceptionInfo
+) -> ConsoleMessage:
+    """Build an ERROR ``ConsoleMessage`` describing ``exc_info``.
+
+    Produces the operator-facing error message for an uncaught exception,
+    whether it surfaced from a UDF on the data path (DataProcessor) or from a
+    user expression evaluated on the main loop thread. Sharing this factory
+    keeps every uncaught-exception path reporting identically; callers are
+    responsible for recording the exception with the exception manager,
+    queueing the returned message, and flushing/pausing as appropriate.
+    """
+    tb = traceback.extract_tb(exc_info[2])
+    if tb:
+        filename, line_number, func_name, _ = tb[-1]
+        module_name, _ = os.path.splitext(os.path.basename(filename))
+        source = f"{module_name}:{func_name}:{line_number}"
+    else:
+        # No traceback frames (e.g. an exception object that was never raised).
+        # Still report it -- an error reporter must not itself throw.
+        source = ""
+    formatted_exception = traceback.format_exception(*exc_info)
+    title: str = formatted_exception[-1].strip()
+    message: str = "\n".join(formatted_exception)
+
+    return ConsoleMessage(
+        worker_id=worker_id,
+        timestamp=current_time_in_local_timezone(),
+        msg_type=ConsoleMessageType.ERROR,
+        source=source,
+        title=title,
+        message=message,
+    )
diff --git 
a/amber/src/test/python/core/util/console_message/test_error_message.py 
b/amber/src/test/python/core/util/console_message/test_error_message.py
new file mode 100644
index 0000000000..5c6bfeec61
--- /dev/null
+++ b/amber/src/test/python/core/util/console_message/test_error_message.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from core.util.console_message.error_message import 
create_error_console_message
+from proto.org.apache.texera.amber.engine.architecture.rpc import 
ConsoleMessageType
+
+
+class TestCreateErrorConsoleMessage:
+    def test_builds_error_console_message_from_exc_info(self):
+        # create_error_console_message turns an exc_info into a single ERROR
+        # ConsoleMessage: title is the exception's final line, message is the
+        # full formatted traceback, and source encodes the raising frame.
+        try:
+            raise ValueError("boom from udf")
+        except ValueError:
+            exc_info = sys.exc_info()
+
+        msg = create_error_console_message("worker-7", exc_info)
+
+        assert msg.worker_id == "worker-7"
+        assert msg.msg_type == ConsoleMessageType.ERROR
+        assert msg.title == "ValueError: boom from udf"
+        assert "Traceback (most recent call last)" in msg.message
+        assert "ValueError: boom from udf" in msg.message
+        # source encodes "<module>:<func>:<line>" of the raising frame
+        parts = msg.source.split(":")
+        assert len(parts) == 3
+        assert parts[0] == "test_error_message"
+        assert parts[1] == "test_builds_error_console_message_from_exc_info"
+
+    def test_source_points_to_the_deepest_raising_frame(self):
+        # source must encode the frame where the exception was RAISED (the
+        # deepest frame), not where it was caught.
+        def _inner():
+            raise RuntimeError("deep failure")
+
+        try:
+            _inner()
+        except RuntimeError:
+            exc_info = sys.exc_info()
+
+        msg = create_error_console_message("w0", exc_info)
+
+        parts = msg.source.split(":")
+        assert parts[0] == "test_error_message"
+        assert parts[1] == "_inner"  # the raising frame, not the test method
+
+    def test_chained_exception_reports_active_error_with_full_chain(self):
+        # A `raise ... from ...` chain: the title is the active (outer) error,
+        # and the message carries the whole chain including the connector.
+        try:
+            try:
+                raise ValueError("root cause")
+            except ValueError as cause:
+                raise RuntimeError("wrapping error") from cause
+        except RuntimeError:
+            exc_info = sys.exc_info()
+
+        msg = create_error_console_message("w0", exc_info)
+
+        assert msg.title == "RuntimeError: wrapping error"
+        assert "ValueError: root cause" in msg.message
+        assert "RuntimeError: wrapping error" in msg.message
+        assert "direct cause" in msg.message  # chain connector text
+
+    def test_exception_without_message_uses_bare_class_name(self):
+        try:
+            raise RuntimeError
+        except RuntimeError:
+            exc_info = sys.exc_info()
+
+        msg = create_error_console_message("w0", exc_info)
+
+        assert msg.msg_type == ConsoleMessageType.ERROR
+        assert msg.title == "RuntimeError"
+
+    def test_missing_traceback_is_reported_without_crashing(self):
+        # An exception object that was never raised has no traceback. The
+        # reporter must still produce a message rather than throw.
+        exc_info = (ValueError, ValueError("never raised"), None)
+
+        msg = create_error_console_message("w0", exc_info)
+
+        assert msg.msg_type == ConsoleMessageType.ERROR
+        assert msg.title == "ValueError: never raised"
+        assert msg.source == ""

Reply via email to