This is an automated email from the ASF dual-hosted git repository.

aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new e4ec881208 fix: order shutdown after Result yield in 
ProxyServer.do_action (#5076)
e4ec881208 is described below

commit e4ec881208162adfe9cd7226fef3677a9d676a6f
Author: Matthew B. <[email protected]>
AuthorDate: Fri May 15 16:33:48 2026 -0700

    fix: order shutdown after Result yield in ProxyServer.do_action (#5076)
    
    ### What changes were proposed in this PR?
    On Python 3.13, the
    `core/proxy/test_proxy_client.py::test_client_can_shutdown_server` test
    intermittently failed with `pyarrow._flight.FlightUnavailableError:
    Broken pipe` because the registered `shutdown` action spawned a thread
    that called `super().shutdown()` while pyarrow was still flushing the
    `"Bye bye!"` reply. The handler is now a no-op, and
    `ProxyServer.do_action` launches the shutdown thread only after `yield
    Result(...)`, so the reply is handed to gRPC before the listener closes.
    This is a causal fix, not a timing buffer, and works on every supported
    Python version.
      ### Any related issues, documentation, or discussions?
      Closes: #4650
      ### How was this PR tested?
    Ran `pytest src/test/python/core/proxy/` on Python 3.13 (11/11 passing)
    and repeated the previously flaky `test_client_can_shutdown_server` 10
    consecutive times with no failures; `ruff check` and `ruff format
    --check` over `amber/src/{main, test}/python` both pass, matching CI.
      ### Was this PR authored or co-authored using generative AI tooling?
      Co-authored with Claude Opus 4.7 in compliance with ASF
---
 amber/src/main/python/core/proxy/proxy_server.py      | 10 ++++++----
 amber/src/test/python/core/proxy/test_proxy_server.py | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/amber/src/main/python/core/proxy/proxy_server.py 
b/amber/src/main/python/core/proxy/proxy_server.py
index b3bc3afea0..0dee156bbf 100644
--- a/amber/src/main/python/core/proxy/proxy_server.py
+++ b/amber/src/main/python/core/proxy/proxy_server.py
@@ -126,12 +126,10 @@ class ProxyServer(FlightServerBase):
         self.register(name="heartbeat", action=ProxyServer.ack()(lambda: None))
 
         # register shutdown, this is the default action for the client to
-        # terminate the server.
+        # terminate the server after the "Bye bye!" Result has been yielded.
         self.register(
             name="shutdown",
-            action=ProxyServer.ack(msg="Bye bye!")(
-                lambda: threading.Thread(target=self.graceful_shutdown).start()
-            ),
+            action=ProxyServer.ack(msg="Bye bye!")(lambda: None),
             description="Shut down this server.",
         )
 
@@ -251,6 +249,10 @@ class ProxyServer(FlightServerBase):
             else:
                 encoded = str(result).encode("utf-8")
             yield Result(py_buffer(encoded))
+
+            # For "shutdown", tear the server down only after the Result has 
been yielded
+            if action_name == "shutdown":
+                threading.Thread(target=self.graceful_shutdown).start()
         else:
             raise KeyError("Unknown action {!r}".format(action_name))
 
diff --git a/amber/src/test/python/core/proxy/test_proxy_server.py 
b/amber/src/test/python/core/proxy/test_proxy_server.py
index 22aec0cc69..3055ce0c64 100644
--- a/amber/src/test/python/core/proxy/test_proxy_server.py
+++ b/amber/src/test/python/core/proxy/test_proxy_server.py
@@ -15,6 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import threading
+from unittest.mock import patch
+
 import pytest
 from pyarrow.flight import Action
 
@@ -66,3 +69,19 @@ class TestProxyServer:
             assert next(
                 server.do_action(None, Action(name, b""))
             ).body.to_pybytes() == str(result).encode("utf-8")
+
+    def test_shutdown_action_yields_reply_before_starting_shutdown(self, 
server):
+        shutdown_started = threading.Event()
+        with patch.object(
+            server, "graceful_shutdown", side_effect=shutdown_started.set
+        ) as mock_shutdown:
+            results = server.do_action(None, Action("shutdown", b""))
+
+            first = next(results)
+            assert first.body.to_pybytes() == b"Bye bye!"
+            assert not mock_shutdown.called
+
+            with pytest.raises(StopIteration):
+                next(results)
+            assert shutdown_started.wait(timeout=5)
+            mock_shutdown.assert_called_once()

Reply via email to