pvardanis opened a new issue, #44132:
URL: https://github.com/apache/arrow/issues/44132

   ### Describe the usage question you have. Please include as many useful 
details as  possible.
   
   
   I have a `do_exchange()` method that needs to interact with `async` code 
that's an external dependency where I don't have any control of it whatsoever. 
Unfortunately, my understanding around concurrency/multi-threading isn't the 
best. I do know though that mixing multi-threaded with `async` code isn't a 
recommended approach, if there's any alternative please let me know. 
   
   I'm creating a new event loop inside the `do_exchange` method, so when 
multiple requests arrive the server spawns up multiple threads as expected, and 
each thread should (?) have its own event loop. Whenever the external `async` 
code is called, I'm using an async lock to make sure no multiple threads access 
it simultaneously since it has stateful variables that I don't want to change 
at the same time (and it's not thread-safe anyways).
   ```python
   def do_exchange(
           self,
           context: flight.ServerCallContext,
           descriptor: flight.FlightDescriptor,
           reader: flight.FlightStreamReader,
           writer: flight.FlightStreamWriter,
       ) -> None:
           """This method implements the `do_exchange` method of the 
FlightServerBase
           class.
   
           :param context: A ServerCallContext object.
           :param descriptor: A FlightDescriptor object.
           :param reader: A FlightStreamReader object.
           :param writer: A FlightStreamWriter object.
           """
           loop = asyncio.new_event_loop()
           asyncio.set_event_loop(loop)
           is_first_batch = True
           while True:
               logger.info("Processing data...")
               (writer, reader, is_first_batch) = loop.run_until_complete(
                   self._run_inference_and_write_to_stream(writer, reader, 
is_first_batch),
               )
               logger.info("Output data ready to be consumed.")
   ```
   however when two client requests arrive at the same time I'm getting:
   ```
   pyarrow._flight.FlightServerError: Task <Task pending name='Task-2' 
coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at 
/Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157>
 cb=[_run_until_complete_cb() at 
/Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]>
 got Future <Future pending> attached to a different loop. Detail: Python 
exception: RuntimeError. gRPC client debug context: UNKNOWN:Error received from 
peer ipv4:0.0.0.0:8080 {created_time:"2024-09-16T15:44:17.886048+02:00", 
grpc_status:2, grpc_message:"Task <Task pending name=\'Task-2\' 
coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at 
/Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157>
 cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/vers
 ions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future 
pending> attached to a different loop. Detail: Python exception: 
RuntimeError"}. Client context: OK
   ```
   
   
   ### Component(s)
   
   FlightRPC, Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to