This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 654cdf4  ARROW-14255: [Python] Fix FlightClient.do_action
654cdf4 is described below

commit 654cdf4b557338b80d3e7d4adcc2ff786f5e6502
Author: Alessandro Molina <[email protected]>
AuthorDate: Thu Oct 14 10:42:47 2021 +0200

    ARROW-14255: [Python] Fix FlightClient.do_action
    
    Closes #11354 from amol-/ARROW-14255
    
    Authored-by: Alessandro Molina <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 python/pyarrow/_flight.pyx          | 23 +++++++++++++++--------
 python/pyarrow/tests/test_flight.py | 27 +++++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index e6d38bd..5b00c53 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -1285,7 +1285,6 @@ cdef class FlightClient(_Weakrefable):
         """
         cdef:
             unique_ptr[CResultStream] results
-            Result result
             CFlightCallOptions* c_options = FlightCallOptions.unwrap(options)
 
         if isinstance(action, (str, bytes)):
@@ -1300,13 +1299,18 @@ cdef class FlightClient(_Weakrefable):
             check_flight_status(
                 self.client.get().DoAction(
                     deref(c_options), c_action, &results))
-        while True:
-            result = Result.__new__(Result)
-            with nogil:
-                check_flight_status(results.get().Next(&result.result))
-                if result.result == NULL:
-                    break
-            yield result
+
+        def _do_action_response():
+            cdef:
+                Result result
+            while True:
+                result = Result.__new__(Result)
+                with nogil:
+                    check_flight_status(results.get().Next(&result.result))
+                    if result.result == NULL:
+                        break
+                yield result
+        return _do_action_response()
 
     def list_flights(self, criteria: bytes = None,
                      options: FlightCallOptions = None):
@@ -1950,6 +1954,9 @@ cdef CStatus _do_action(void* self, const 
CServerCallContext& context,
         return (<FlightError> flight_error).to_status()
     # Let the application return an iterator or anything convertible
     # into one
+    if responses is None:
+        # Server didn't return anything
+        responses = []
     result.reset(new CPyFlightResultStream(iter(responses), ptr))
     return CStatus_OK()
 
diff --git a/python/pyarrow/tests/test_flight.py 
b/python/pyarrow/tests/test_flight.py
index abfdfb1..5c40467 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -25,6 +25,7 @@ import tempfile
 import threading
 import time
 import traceback
+import json
 
 import numpy as np
 import pytest
@@ -2018,3 +2019,29 @@ def test_large_metadata_client():
             writer, reader = client.do_exchange(descriptor)
             with writer:
                 reader.read_all()
+
+
+class ActionNoneFlightServer(EchoFlightServer):
+    """A server that implements a side effect to a non iterable action."""
+    VALUES = []
+
+    def do_action(self, context, action):
+        if action.type == "get_value":
+            return [json.dumps(self.VALUES).encode('utf-8')]
+        elif action.type == "append":
+            self.VALUES.append(True)
+            return None
+        raise NotImplementedError
+
+
+def test_none_action_side_effect():
+    """Ensure that actions are executed even when we don't consume iterator.
+
+    See https://issues.apache.org/jira/browse/ARROW-14255
+    """
+
+    with ActionNoneFlightServer() as server:
+        client = FlightClient(('localhost', server.port))
+        client.do_action(flight.Action("append", b""))
+        r = client.do_action(flight.Action("get_value", b""))
+        assert json.loads(next(r).body.to_pybytes()) == [True]

Reply via email to