pitrou commented on code in PR #36986: URL: https://github.com/apache/arrow/pull/36986#discussion_r1297339562
########## python/pyarrow/_flight.pyx: ########## @@ -1320,6 +1410,11 @@ cdef class FlightClient(_Weakrefable): check_flight_status(CFlightClient.Connect(c_location, c_options ).Value(&self.client)) + def as_async(self) -> None: + if self._pool is None: + self._pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + return AsyncioFlightClient(self, self._pool) Review Comment: The pool isn't used anywhere, right? Why is it necessary? ########## python/pyarrow/_flight.pyx: ########## @@ -1219,6 +1220,93 @@ cdef class FlightMetadataWriter(_Weakrefable): check_flight_status(self.writer.get().WriteMetadata(deref(buf))) +class AsyncioCall: + """State for an async RPC using asyncio.""" + + def __init__(self) -> None: + import asyncio + + # Python waits on the event. + self._event = asyncio.Event() + # The C++ callback puts a task on the pool to set the event, waking up + # Python. + # The result of the async call. + # TODO(lidavidm): how best to handle streams? Review Comment: Using a `list` and a ` asyncio.Condition` perhaps? Or a `Queue` and a `asyncio.Event`. ########## python/pyarrow/_flight.pyx: ########## @@ -1219,6 +1220,93 @@ cdef class FlightMetadataWriter(_Weakrefable): check_flight_status(self.writer.get().WriteMetadata(deref(buf))) +class AsyncioCall: + """State for an async RPC using asyncio.""" + + def __init__(self) -> None: + import asyncio + + # Python waits on the event. + self._event = asyncio.Event() + # The C++ callback puts a task on the pool to set the event, waking up + # Python. + # The result of the async call. + # TODO(lidavidm): how best to handle streams? + self._result = None + # The error raised by the async call. + self._exception = None + self._loop = asyncio.get_running_loop() + + async def wait(self) -> object: + """Wait for the RPC call to finish.""" + await self._event.wait() + if self._exception: + raise self._exception + return self._result + + def wakeup(self, *, result=None, exception=None) -> None: + """Finish the RPC call.""" + self._result = result + self._exception = exception + # Set the event from within the loop to avoid a race (asyncio + # objects are not necessarily thread-safe) + self._loop.call_soon_threadsafe(lambda: self._event.set()) + + +cdef class AsyncioFlightClient: + """ + A FlightClient with an asyncio-based async interface. + + This interface is EXPERIMENTAL. + """ + + cdef: + FlightClient _client + + def __init__(self, FlightClient client, pool) -> None: + self._client = client + + async def get_flight_info( + self, + descriptor: FlightDescriptor, + *, + options: FlightCallOptions = None, + ): + call = AsyncioCall() + self._get_flight_info(call, descriptor, options) + return await call.wait() + + cdef _get_flight_info(self, call, descriptor, options): + cdef: + CFlightCallOptions* c_options = \ + FlightCallOptions.unwrap(options) + CFlightDescriptor c_descriptor = \ + FlightDescriptor.unwrap(descriptor) + function[cb_client_async_get_flight_info] callback = \ + &_client_async_get_flight_info + + with nogil: + CAsyncGetFlightInfo( + self._client.client.get(), + deref(c_options), + c_descriptor, + call, + callback, + ) + + +cdef void _client_async_get_flight_info(void* self, CFlightInfo* info, const CStatus& status) except *: + """Bridge the C++ async call with the Python side.""" + cdef: + FlightInfo result = FlightInfo.__new__(FlightInfo) + call: AsyncioCall = <object> self + if status.ok(): + result.info.reset(new CFlightInfo(move(deref(info)))) + call.wakeup(result=result) + else: + call.wakeup(exception=convert_status(status)) Review Comment: I've sketched something that seems to work here: https://gist.github.com/pitrou/d4e5a386bc62d05fda6593f1b1e83a47 ########## python/pyarrow/_flight.pyx: ########## @@ -18,6 +18,7 @@ # cython: language_level = 3 import collections +import concurrent.futures Review Comment: Remove this if you remove the pool. ########## python/pyarrow/_flight.pyx: ########## @@ -1219,6 +1220,93 @@ cdef class FlightMetadataWriter(_Weakrefable): check_flight_status(self.writer.get().WriteMetadata(deref(buf))) +class AsyncioCall: + """State for an async RPC using asyncio.""" + + def __init__(self) -> None: + import asyncio + + # Python waits on the event. + self._event = asyncio.Event() + # The C++ callback puts a task on the pool to set the event, waking up + # Python. + # The result of the async call. + # TODO(lidavidm): how best to handle streams? + self._result = None + # The error raised by the async call. + self._exception = None + self._loop = asyncio.get_running_loop() + + async def wait(self) -> object: + """Wait for the RPC call to finish.""" + await self._event.wait() + if self._exception: + raise self._exception + return self._result + + def wakeup(self, *, result=None, exception=None) -> None: + """Finish the RPC call.""" + self._result = result + self._exception = exception + # Set the event from within the loop to avoid a race (asyncio + # objects are not necessarily thread-safe) + self._loop.call_soon_threadsafe(lambda: self._event.set()) + + +cdef class AsyncioFlightClient: + """ + A FlightClient with an asyncio-based async interface. + + This interface is EXPERIMENTAL. + """ + + cdef: + FlightClient _client + + def __init__(self, FlightClient client, pool) -> None: + self._client = client + + async def get_flight_info( + self, + descriptor: FlightDescriptor, + *, + options: FlightCallOptions = None, + ): + call = AsyncioCall() + self._get_flight_info(call, descriptor, options) + return await call.wait() + + cdef _get_flight_info(self, call, descriptor, options): + cdef: + CFlightCallOptions* c_options = \ + FlightCallOptions.unwrap(options) + CFlightDescriptor c_descriptor = \ + FlightDescriptor.unwrap(descriptor) + function[cb_client_async_get_flight_info] callback = \ + &_client_async_get_flight_info + + with nogil: + CAsyncGetFlightInfo( + self._client.client.get(), + deref(c_options), + c_descriptor, + call, + callback, + ) + + +cdef void _client_async_get_flight_info(void* self, CFlightInfo* info, const CStatus& status) except *: + """Bridge the C++ async call with the Python side.""" + cdef: + FlightInfo result = FlightInfo.__new__(FlightInfo) + call: AsyncioCall = <object> self + if status.ok(): + result.info.reset(new CFlightInfo(move(deref(info)))) + call.wakeup(result=result) + else: + call.wakeup(exception=convert_status(status)) Review Comment: This is fine as a prototype, but it's going to be a lot of boilerplate if we need to implement a separate Cython callback and C++ wrapper for each async method? Presumably, what we need is a way of binding a Arrow C++ `Future` to an arbitrary Python (or Cython-defined) callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org