This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new b2f178d585 test(amber-python): add unit tests for Udon debugger (#4510)
b2f178d585 is described below
commit b2f178d585b17b4b1b0400ef15355ea9460f52ee
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 18:40:39 2026 -0700
test(amber-python): add unit tests for Udon debugger (#4510)
### What changes were proposed in this PR?
Adds the first Python-side unit tests for the Udon UDF debugger:
- **`test_debug_manager.py`** (6 cases) — covers `DebugManager`'s init
state, command/event pipe checks, `prompt == ""`, flushed-output
round-trip via `Pdb.stdout`, and independence of the command vs event
pipes.
- **`test_debug_command_handler.py`** (7 cases) — covers
`WorkerDebugCommandHandler.translate_debug_command`: `b`/`break` with
lineno prepends `module:`, condition arg is preserved, no-arg `b` falls
through, non-break commands pass through, and whitespace is stripped.
No production code is touched.
### Any related issues, documentation, discussions?
Closes #4509
### How was this PR tested?
Ran the new tests locally with the project's shared venv:
```
$ python -m pytest core/architecture/managers/test_debug_manager.py \
core/architecture/handlers/control/test_debug_command_handler.py -v
======================== 13 passed in 1.00s ========================
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../handlers/control/test_debug_command_handler.py | 195 +++++++++++++++++++++
.../architecture/managers/test_debug_manager.py | 116 ++++++++++++
2 files changed, 311 insertions(+)
diff --git
a/amber/src/main/python/core/architecture/handlers/control/test_debug_command_handler.py
b/amber/src/main/python/core/architecture/handlers/control/test_debug_command_handler.py
new file mode 100644
index 0000000000..f6e3bfa641
--- /dev/null
+++
b/amber/src/main/python/core/architecture/handlers/control/test_debug_command_handler.py
@@ -0,0 +1,195 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.handlers.control.debug_command_handler import (
+ WorkerDebugCommandHandler,
+)
+from core.architecture.managers.pause_manager import PauseType
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ DebugCommandRequest,
+ EmptyReturn,
+)
+
+
+class TestTranslateDebugCommand:
+ @pytest.fixture
+ def context(self):
+ return SimpleNamespace(
+ executor_manager=SimpleNamespace(operator_module_name="my_udf")
+ )
+
+ def test_break_with_lineno_prepends_module(self, context):
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("b 5", context)
+ == "b my_udf:5"
+ )
+
+ def test_long_break_with_lineno_prepends_module(self, context):
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("break 12",
context)
+ == "break my_udf:12"
+ )
+
+ def test_break_preserves_condition_arg(self, context):
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("b 7 x > 0",
context)
+ == "b my_udf:7 x > 0"
+ )
+
+ def test_break_with_no_args_passes_through(self, context):
+ # No args → falls through to the else branch (no module rewriting).
+ assert WorkerDebugCommandHandler.translate_debug_command("b", context)
== "b"
+
+ def test_non_break_command_passes_through(self, context):
+ assert WorkerDebugCommandHandler.translate_debug_command("n", context)
== "n"
+
+ def test_non_break_command_with_args_is_rejoined(self, context):
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("p some_var",
context)
+ == "p some_var"
+ )
+
+ def test_leading_and_trailing_whitespace_is_stripped(self, context):
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command(" c ",
context) == "c"
+ )
+
+ def test_internal_whitespace_is_collapsed_to_single_space(self, context):
+ # split() with no args collapses any run of whitespace, so the rejoined
+ # form has single spaces regardless of how many the user typed.
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("p foo
bar", context)
+ == "p foo bar"
+ )
+
+ def test_break_with_only_lineno_has_no_trailing_space(self, context):
+ # The implementation joins the (empty) tail with " "; the final strip()
+ # must remove the trailing whitespace so the command stays valid pdb.
+ result = WorkerDebugCommandHandler.translate_debug_command("b 5",
context)
+ assert result == "b my_udf:5"
+ assert not result.endswith(" ")
+
+ # ----- edge cases / invalid input -----
+
+ def test_empty_command_raises_value_error(self, context):
+ # `command.strip().split()` on "" returns [], so the unpacking
+ # debug_command, *debug_args = ...
+ # raises ValueError. The handler does not guard against this — the
+ # frontend is expected to never send empty commands. Pin the current
+ # behavior so any future guard is a deliberate change.
+ with pytest.raises(ValueError):
+ WorkerDebugCommandHandler.translate_debug_command("", context)
+
+ def test_whitespace_only_command_raises_value_error(self, context):
+ with pytest.raises(ValueError):
+ WorkerDebugCommandHandler.translate_debug_command(" \t ",
context)
+
+ def test_uppercase_break_is_not_recognized(self, context):
+ # The match list is case-sensitive: ["b", "break"]. "BREAK" / "B" fall
+ # through to the pass-through branch and won't get the module prefix.
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("BREAK 5",
context)
+ == "BREAK 5"
+ )
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("B 5", context)
== "B 5"
+ )
+
+ def test_break_with_function_name_is_also_module_prefixed(self, context):
+ # pdb's `b` accepts either a lineno or a function name. The
+ # translation prefixes the module unconditionally; document that.
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("b my_func",
context)
+ == "b my_udf:my_func"
+ )
+
+ def test_break_with_explicit_filename_is_re_prefixed(self, context):
+ # If the user already typed `b foo.py:5`, the translator naively
+ # prepends the module again, yielding `b my_udf:foo.py:5`. Pin this.
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("b foo.py:5",
context)
+ == "b my_udf:foo.py:5"
+ )
+
+ def test_module_name_none_is_rendered_as_string_none(self, context):
+ # If the executor hasn't been initialized yet, operator_module_name is
+ # None; the f-string interpolates it as the literal "None". The
+ # frontend isn't expected to send debug commands in this state, but
+ # if it does, this is what comes out.
+ context.executor_manager.operator_module_name = None
+ assert (
+ WorkerDebugCommandHandler.translate_debug_command("b 5", context)
+ == "b None:5"
+ )
+
+
+class TestDebugCommandAsyncFlow:
+ @pytest.fixture
+ def handler(self):
+ # ControlHandler.__init__ just stashes context; bypass the protobuf
+ # base class' __init__ by constructing via __new__.
+ instance = WorkerDebugCommandHandler.__new__(WorkerDebugCommandHandler)
+ instance.context = SimpleNamespace(
+ executor_manager=SimpleNamespace(operator_module_name="my_udf"),
+ debug_manager=MagicMock(),
+ pause_manager=MagicMock(),
+ )
+ return instance
+
+ def test_translates_then_forwards_to_debug_manager(self, handler):
+ asyncio.run(handler.debug_command(DebugCommandRequest(cmd="b 5")))
+
handler.context.debug_manager.put_debug_command.assert_called_once_with(
+ "b my_udf:5"
+ )
+
+ def test_resumes_all_three_pause_types(self, handler):
+ asyncio.run(handler.debug_command(DebugCommandRequest(cmd="c")))
+ actual = [
+ call.args[0] for call in
handler.context.pause_manager.resume.call_args_list
+ ]
+ assert actual == [
+ PauseType.USER_PAUSE,
+ PauseType.EXCEPTION_PAUSE,
+ PauseType.DEBUG_PAUSE,
+ ]
+
+ def test_returns_empty_return(self, handler):
+ result =
asyncio.run(handler.debug_command(DebugCommandRequest(cmd="n")))
+ assert isinstance(result, EmptyReturn)
+
+ def test_passes_through_non_break_command_unchanged(self, handler):
+ asyncio.run(handler.debug_command(DebugCommandRequest(cmd="p x")))
+
handler.context.debug_manager.put_debug_command.assert_called_once_with("p x")
+
+ def test_empty_cmd_propagates_value_error(self, handler):
+ # An empty cmd hits the ValueError in translate_debug_command. The
+ # handler does not catch it — the RPC layer will surface the failure
+ # back to the caller. Pin this so silent swallowing doesn't sneak in.
+ with pytest.raises(ValueError):
+ asyncio.run(handler.debug_command(DebugCommandRequest(cmd="")))
+
+ def test_translation_failure_skips_put_and_resume(self, handler):
+ with pytest.raises(ValueError):
+ asyncio.run(handler.debug_command(DebugCommandRequest(cmd="")))
+ handler.context.debug_manager.put_debug_command.assert_not_called()
+ handler.context.pause_manager.resume.assert_not_called()
diff --git
a/amber/src/main/python/core/architecture/managers/test_debug_manager.py
b/amber/src/main/python/core/architecture/managers/test_debug_manager.py
new file mode 100644
index 0000000000..248a2e134e
--- /dev/null
+++ b/amber/src/main/python/core/architecture/managers/test_debug_manager.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import Condition
+
+import pytest
+
+from core.architecture.managers.debug_manager import DebugManager
+
+
+class TestDebugManager:
+ @pytest.fixture
+ def debug_manager(self):
+ return DebugManager(Condition())
+
+ def test_it_can_init(self, debug_manager):
+ assert debug_manager.debugger is not None
+ assert debug_manager.debugger.prompt == ""
+
+ def test_it_has_no_command_initially(self, debug_manager):
+ assert not debug_manager.has_debug_command()
+
+ def test_it_has_no_event_initially(self, debug_manager):
+ assert not debug_manager.has_debug_event()
+
+ def test_put_command_sets_has_debug_command(self, debug_manager):
+ debug_manager.put_debug_command("n")
+ assert debug_manager.has_debug_command()
+
+ def test_get_debug_event_returns_flushed_output(self, debug_manager):
+ # Pdb writes to its stdout via the SingleBlockingIO; simulate that path
+ # directly so we don't have to spin up a real debugging session.
+ debug_manager.debugger.stdout.write("hit breakpoint")
+ debug_manager.debugger.stdout.flush()
+ assert debug_manager.has_debug_event()
+ assert debug_manager.get_debug_event() == "hit breakpoint\n"
+ assert not debug_manager.has_debug_event()
+
+ def test_command_pipe_and_event_pipe_are_independent(self, debug_manager):
+ debug_manager.put_debug_command("step")
+ assert debug_manager.has_debug_command()
+ assert not debug_manager.has_debug_event()
+
+ debug_manager.debugger.stdout.write("event")
+ debug_manager.debugger.stdout.flush()
+ # Putting a command must not consume an event, and vice versa.
+ assert debug_manager.has_debug_command()
+ assert debug_manager.has_debug_event()
+
+ def test_pdb_is_wired_to_debug_pipes(self, debug_manager):
+ # The Pdb instance must read from the same IO that put_debug_command
+ # writes to, and write to the same IO that get_debug_event reads from.
+ debug_manager.put_debug_command("c")
+ # Reading via the debugger's stdin must see the queued command.
+ assert debug_manager.debugger.stdin.readline() == "c\n"
+
+ debug_manager.debugger.stdout.write("paused")
+ debug_manager.debugger.stdout.flush()
+ assert debug_manager.get_debug_event() == "paused\n"
+
+ def test_event_pipe_supports_multiple_round_trips(self, debug_manager):
+ for line in ("first", "second", "third"):
+ debug_manager.debugger.stdout.write(line)
+ debug_manager.debugger.stdout.flush()
+ assert debug_manager.get_debug_event() == f"{line}\n"
+ assert not debug_manager.has_debug_event()
+
+ def test_debugger_uses_nosigint_to_avoid_signal_install(self,
debug_manager):
+ # We construct Pdb with nosigint=True to avoid touching signal handlers
+ # in the worker thread. Guard against accidental flips.
+ assert debug_manager.debugger.nosigint is True
+
+ # ----- edge cases / quirks -----
+
+ def test_put_empty_command_still_marks_command_present(self,
debug_manager):
+ # SingleBlockingIO.flush always commits buf + "\n" to value, so even
+ # an empty command becomes a "\n" line and shows up as a pending
+ # command. Documents current behavior.
+ debug_manager.put_debug_command("")
+ assert debug_manager.has_debug_command()
+ assert debug_manager.debugger.stdin.readline() == "\n"
+
+ def test_put_overwrites_unconsumed_command(self, debug_manager):
+ # The command pipe holds at most one value. A second put without an
+ # intervening consume silently overwrites the first — known data-loss
+ # quirk of SingleBlockingIO. Pinning this so callers don't accidentally
+ # rely on queued semantics.
+ debug_manager.put_debug_command("first")
+ debug_manager.put_debug_command("second")
+ assert debug_manager.debugger.stdin.readline() == "second\n"
+
+ def test_put_command_with_embedded_newline_is_passed_verbatim(self,
debug_manager):
+ # An embedded newline is not sanitized; pdb would see the raw bytes.
+ debug_manager.put_debug_command("step\nlist")
+ assert debug_manager.debugger.stdin.readline() == "step\nlist\n"
+
+ def test_event_pipe_overwrites_unconsumed_event(self, debug_manager):
+ debug_manager.debugger.stdout.write("first")
+ debug_manager.debugger.stdout.flush()
+ debug_manager.debugger.stdout.write("second")
+ debug_manager.debugger.stdout.flush()
+ assert debug_manager.get_debug_event() == "second\n"