pvardanis opened a new issue, #44132:
URL: https://github.com/apache/arrow/issues/44132
### Describe the usage question you have. Please include as many useful
details as possible.
I have a `do_exchange()` method that needs to interact with `async` code
that's an external dependency where I don't have any control of it whatsoever.
Unfortunately, my understanding around concurrency/multi-threading isn't the
best. I do know though that mixing multi-threaded with `async` code isn't a
recommended approach, if there's any alternative please let me know.
I'm creating a new event loop inside the `do_exchange` method, so when
multiple requests arrive the server spawns up multiple threads as expected, and
each thread should (?) have its own event loop. Whenever the external `async`
code is called, I'm using an async lock to make sure no multiple threads access
it simultaneously since it has stateful variables that I don't want to change
at the same time (and it's not thread-safe anyways).
```python
def do_exchange(
self,
context: flight.ServerCallContext,
descriptor: flight.FlightDescriptor,
reader: flight.FlightStreamReader,
writer: flight.FlightStreamWriter,
) -> None:
"""This method implements the `do_exchange` method of the
FlightServerBase
class.
:param context: A ServerCallContext object.
:param descriptor: A FlightDescriptor object.
:param reader: A FlightStreamReader object.
:param writer: A FlightStreamWriter object.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
is_first_batch = True
while True:
logger.info("Processing data...")
(writer, reader, is_first_batch) = loop.run_until_complete(
self._run_inference_and_write_to_stream(writer, reader,
is_first_batch),
)
logger.info("Output data ready to be consumed.")
```
however when two client requests arrive at the same time I'm getting:
```
pyarrow._flight.FlightServerError: Task <Task pending name='Task-2'
coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at
/Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157>
cb=[_run_until_complete_cb() at
/Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]>
got Future <Future pending> attached to a different loop. Detail: Python
exception: RuntimeError. gRPC client debug context: UNKNOWN:Error received from
peer ipv4:0.0.0.0:8080 {created_time:"2024-09-16T15:44:17.886048+02:00",
grpc_status:2, grpc_message:"Task <Task pending name=\'Task-2\'
coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at
/Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157>
cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/vers
ions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future
pending> attached to a different loop. Detail: Python exception:
RuntimeError"}. Client context: OK
```
### Component(s)
FlightRPC, Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]