This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 654cdf4 ARROW-14255: [Python] Fix FlightClient.do_action
654cdf4 is described below
commit 654cdf4b557338b80d3e7d4adcc2ff786f5e6502
Author: Alessandro Molina <[email protected]>
AuthorDate: Thu Oct 14 10:42:47 2021 +0200
ARROW-14255: [Python] Fix FlightClient.do_action
Closes #11354 from amol-/ARROW-14255
Authored-by: Alessandro Molina <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
python/pyarrow/_flight.pyx | 23 +++++++++++++++--------
python/pyarrow/tests/test_flight.py | 27 +++++++++++++++++++++++++++
2 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index e6d38bd..5b00c53 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -1285,7 +1285,6 @@ cdef class FlightClient(_Weakrefable):
"""
cdef:
unique_ptr[CResultStream] results
- Result result
CFlightCallOptions* c_options = FlightCallOptions.unwrap(options)
if isinstance(action, (str, bytes)):
@@ -1300,13 +1299,18 @@ cdef class FlightClient(_Weakrefable):
check_flight_status(
self.client.get().DoAction(
deref(c_options), c_action, &results))
- while True:
- result = Result.__new__(Result)
- with nogil:
- check_flight_status(results.get().Next(&result.result))
- if result.result == NULL:
- break
- yield result
+
+ def _do_action_response():
+ cdef:
+ Result result
+ while True:
+ result = Result.__new__(Result)
+ with nogil:
+ check_flight_status(results.get().Next(&result.result))
+ if result.result == NULL:
+ break
+ yield result
+ return _do_action_response()
def list_flights(self, criteria: bytes = None,
options: FlightCallOptions = None):
@@ -1950,6 +1954,9 @@ cdef CStatus _do_action(void* self, const
CServerCallContext& context,
return (<FlightError> flight_error).to_status()
# Let the application return an iterator or anything convertible
# into one
+ if responses is None:
+ # Server didn't return anything
+ responses = []
result.reset(new CPyFlightResultStream(iter(responses), ptr))
return CStatus_OK()
diff --git a/python/pyarrow/tests/test_flight.py
b/python/pyarrow/tests/test_flight.py
index abfdfb1..5c40467 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -25,6 +25,7 @@ import tempfile
import threading
import time
import traceback
+import json
import numpy as np
import pytest
@@ -2018,3 +2019,29 @@ def test_large_metadata_client():
writer, reader = client.do_exchange(descriptor)
with writer:
reader.read_all()
+
+
+class ActionNoneFlightServer(EchoFlightServer):
+ """A server that implements a side effect to a non iterable action."""
+ VALUES = []
+
+ def do_action(self, context, action):
+ if action.type == "get_value":
+ return [json.dumps(self.VALUES).encode('utf-8')]
+ elif action.type == "append":
+ self.VALUES.append(True)
+ return None
+ raise NotImplementedError
+
+
+def test_none_action_side_effect():
+ """Ensure that actions are executed even when we don't consume iterator.
+
+ See https://issues.apache.org/jira/browse/ARROW-14255
+ """
+
+ with ActionNoneFlightServer() as server:
+ client = FlightClient(('localhost', server.port))
+ client.do_action(flight.Action("append", b""))
+ r = client.do_action(flight.Action("get_value", b""))
+ assert json.loads(next(r).body.to_pybytes()) == [True]