lidavidm commented on code in PR #36986:
URL: https://github.com/apache/arrow/pull/36986#discussion_r1298790037


##########
python/pyarrow/_flight.pyx:
##########
@@ -1320,6 +1410,11 @@ cdef class FlightClient(_Weakrefable):
             check_flight_status(CFlightClient.Connect(c_location, c_options
                                                       ).Value(&self.client))
 
+    def as_async(self) -> None:
+        if self._pool is None:
+            self._pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+        return AsyncioFlightClient(self, self._pool)

Review Comment:
   It's not, I forgot to remove this after trying something



##########
python/pyarrow/_flight.pyx:
##########
@@ -1219,6 +1220,93 @@ cdef class FlightMetadataWriter(_Weakrefable):
             check_flight_status(self.writer.get().WriteMetadata(deref(buf)))
 
 
+class AsyncioCall:
+    """State for an async RPC using asyncio."""
+
+    def __init__(self) -> None:
+        import asyncio
+
+        # Python waits on the event.
+        self._event = asyncio.Event()
+        # The C++ callback puts a task on the pool to set the event, waking up
+        # Python.
+        # The result of the async call.
+        # TODO(lidavidm): how best to handle streams?
+        self._result = None
+        # The error raised by the async call.
+        self._exception = None
+        self._loop = asyncio.get_running_loop()
+
+    async def wait(self) -> object:
+        """Wait for the RPC call to finish."""
+        await self._event.wait()
+        if self._exception:
+            raise self._exception
+        return self._result
+
+    def wakeup(self, *, result=None, exception=None) -> None:
+        """Finish the RPC call."""
+        self._result = result
+        self._exception = exception
+        # Set the event from within the loop to avoid a race (asyncio
+        # objects are not necessarily thread-safe)
+        self._loop.call_soon_threadsafe(lambda: self._event.set())
+
+
+cdef class AsyncioFlightClient:
+    """
+    A FlightClient with an asyncio-based async interface.
+
+    This interface is EXPERIMENTAL.
+    """
+
+    cdef:
+        FlightClient _client
+
+    def __init__(self, FlightClient client, pool) -> None:
+        self._client = client
+
+    async def get_flight_info(
+        self,
+        descriptor: FlightDescriptor,
+        *,
+        options: FlightCallOptions = None,
+    ):
+        call = AsyncioCall()
+        self._get_flight_info(call, descriptor, options)
+        return await call.wait()
+
+    cdef _get_flight_info(self, call, descriptor, options):
+        cdef:
+            CFlightCallOptions* c_options = \
+                FlightCallOptions.unwrap(options)
+            CFlightDescriptor c_descriptor = \
+                FlightDescriptor.unwrap(descriptor)
+            function[cb_client_async_get_flight_info] callback = \
+                &_client_async_get_flight_info
+
+        with nogil:
+            CAsyncGetFlightInfo(
+                self._client.client.get(),
+                deref(c_options),
+                c_descriptor,
+                call,
+                callback,
+            )
+
+
+cdef void _client_async_get_flight_info(void* self, CFlightInfo* info, const 
CStatus& status) except *:
+    """Bridge the C++ async call with the Python side."""
+    cdef:
+        FlightInfo result = FlightInfo.__new__(FlightInfo)
+    call: AsyncioCall = <object> self
+    if status.ok():
+        result.info.reset(new CFlightInfo(move(deref(info))))
+        call.wakeup(result=result)
+    else:
+        call.wakeup(exception=convert_status(status))

Review Comment:
   Thanks! Applied the patch + added an async.h as noted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to