lidavidm commented on code in PR #36986:
URL: https://github.com/apache/arrow/pull/36986#discussion_r1289082136


##########
python/pyarrow/_flight.pyx:
##########
@@ -1219,6 +1219,94 @@ cdef class FlightMetadataWriter(_Weakrefable):
             check_flight_status(self.writer.get().WriteMetadata(deref(buf)))
 
 
+class AsyncioCall:
+    """State for an async RPC using asyncio."""
+
+    def __init__(self) -> None:
+        import asyncio
+
+        # Python waits on the event. The C++ callback sets the event,
+        # waking up the original task.
+        self._event = asyncio.Event()
+        # The result of the async call.
+        # TODO(lidavidm): how best to handle streams?
+        self._result = None
+        # The error raised by the async call.
+        self._exception = None
+        self._loop = asyncio.get_running_loop()
+
+    async def wait(self) -> object:
+        """Wait for the RPC call to finish."""
+        await self._event.wait()
+        if self._exception:
+            raise self._exception
+        return self._result
+
+    def wakeup(self, *, exception=None, result=None) -> None:
+        """Finish the RPC call."""
+        if exception:
+            self._exception = exception
+        else:
+            self._result = result
+        # XXX: why does set() have to be done from within the loop?

Review Comment:
   Also, I wonder if we can just directly complete an asyncio.Future (though I 
am not sure if that generalizes to Trio, or to streaming RPCs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to