zbentley opened a new issue, #127:
URL: https://github.com/apache/pulsar-client-python/issues/127

   ## Context:
   - Pulsar 2.10.2 (StreamNative)
   - Pulsar client Python 2.10.1 and 3.1.0 both repro'd.
   - Python 3.10 and 3.7
   - Linux (Ubuntu 20) and MacOS.
   
   ## Bug description:
   
   It's common practice to clear out global connection cached objects after a 
`fork` takes place. That's typically done via the `os.register_at_fork` method 
in Python. 
   
   Callbacks to that method are similar to signal handlers: they shouldn't do 
anything complicated or IO-ful. However something very common to do in an 
atfork callback is to wipe out a global connection cache, setting it to `None` 
in the child process to indicate that a new connection should be opened if 
needed.
   
   We do that with our Pulsar client connections. Since the Pulsar client isn't 
fork safe at all, we wipe out our global references to it whenever we fork via 
`register_at_fork` handlers (and our code runs in tons of our internal services 
that make _heavy_ use of Python forking; this is an extremely common practice 
in large Python installs: gunicorn, Celery, multiprocessing, billiard, airflow 
... all heavily use fork-based concurrency to encapsulate workloads, share 
expensive-to-load-and-huge pre-forked memory, and evade the GIL).
   
   The most common forking system is `multiprocessing`, which provides a "run 
some work concurrency then wrap up" interface.
   
   We observed that arbitrary `multiprocessing` workloads across our fleet 
occasionally hang. Workloads that very frequently invoke `multiprocessing` 
pools after publishing to Pulsar were more likely to hang. 
   
   After some painful production incidents, we arrived at the below 
reproduction, which indicates an internal Pulsar client deadlock that occurs 
when a global connection cache is cleared out, triggering destruction inside 
the Pulsar-client C++ code. 
   
   **Note that this occurs on *any* invocation of `multiprocessing` in the 
presence of a Pulsar connection that is cleared by an atfork hook, regardless 
of whether the multiprocessing code, or the parent process, will ever use 
Pulsar again**. This is highly unfortunate.
   
   ## Steps to Reproduce
   
   1. Run a standalone pulsar broker on localhost:6650.
   2. Put the below Python code in a `repro.py` file. Optionally, edit the test 
topic name to use a topic/namespace/tenant that exist for you locally.
   3. Do `while true; do python repro.py; done` to repeatedly invoke it.
   4. Wait ten minutes.
   
   Snippet:
   ```python
   import time
   from multiprocessing import Pool, set_start_method
   import pulsar
   import os
   
   
   CLIENT = None
   PROD = None
   
   def do_send():
       global CLIENT
       global PROD
       if PROD is None:
           CLIENT = pulsar.Client(service_url='pulsar://localhost:6650')
           PROD = 
CLIENT.create_producer(topic='persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test')
       PROD.send(b"123")
   
   
   def clear_connection_cache():
       global CLIENT
       global PROD
       print("Destroying connections", flush=True)
       CLIENT = PROD = None
       print("Destroyed connections", flush=True)
   
   
   def main():
       os.register_at_fork(after_in_child=clear_connection_cache)
       do_send()
       processes = 20
       set_start_method('fork')
       pool = Pool(
           processes=processes,
       )
   
       try:
           # Map_async with timeout is used here with get and a super large 
timeout to get an interruptible
           # AsyncResult object. This is because regular map() calls are not 
interruptible.
           pool.map_async(
               time.sleep, [0.1 for _ in range(processes)]
           ).get(1)
       finally:
           pool.close()
           print("Joining pool", flush=True)
           pool.join()
           print("Joined pool", flush=True)
   
   
   if __name__ == '__main__':
       main()
   
   ```
   
   ## Expected behavior
   
   The code should continue successfully being invoked over and over after ten 
minutes.
   
   More to the point **all invocations of Pulsar client functionality, 
including destruction, should be "safe" to perform in all contexts: forks, 
threads, asyncio coroutines, trio coroutines, multiprocessing, Billiard, 
`pthread_atfork` hooks, signal handlers, etc**. "Safe" might mean "works" or it 
might mean "errors immediately saying 'this is not allowed'". But hanging 
forever is not safe.
   
   ## Observed behavior
   
   After some number of iterations, usually <1min, the code hangs in a 
pool-join timeout.
   
   When that happens, the number of `Destroying connections` log lines and 
`Destroyed connections` log lines will not match up, indicating that one call 
to `CLIENT = PROD = None` is blocking.
   
   ## Root cause
   
   When the test code hangs, attaching a `py-spy` stack printer (with 
`--threads --native`) to the hung subprocess indicates that it is stuck in what 
appears to be code related to the client's destructor. The code in 
`_after_fork_child` in the below stack is exactly analogous to the 
`clear_connection_cache` code in the above snippet: 
   <img width="878" alt="image" 
src="https://github.com/apache/pulsar-client-python/assets/10999109/6a05e4a2-cc55-4139-928b-3fc04bd34d07";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to