zbentley opened a new issue, #127:
URL: https://github.com/apache/pulsar-client-python/issues/127
## Context:
- Pulsar 2.10.2 (StreamNative)
- Pulsar client Python 2.10.1 and 3.1.0 both repro'd.
- Python 3.10 and 3.7
- Linux (Ubuntu 20) and MacOS.
## Bug description:
It's common practice to clear out global connection cached objects after a
`fork` takes place. That's typically done via the `os.register_at_fork` method
in Python.
Callbacks to that method are similar to signal handlers: they shouldn't do
anything complicated or IO-ful. However something very common to do in an
atfork callback is to wipe out a global connection cache, setting it to `None`
in the child process to indicate that a new connection should be opened if
needed.
We do that with our Pulsar client connections. Since the Pulsar client isn't
fork safe at all, we wipe out our global references to it whenever we fork via
`register_at_fork` handlers (and our code runs in tons of our internal services
that make _heavy_ use of Python forking; this is an extremely common practice
in large Python installs: gunicorn, Celery, multiprocessing, billiard, airflow
... all heavily use fork-based concurrency to encapsulate workloads, share
expensive-to-load-and-huge pre-forked memory, and evade the GIL).
The most common forking system is `multiprocessing`, which provides a "run
some work concurrency then wrap up" interface.
We observed that arbitrary `multiprocessing` workloads across our fleet
occasionally hang. Workloads that very frequently invoke `multiprocessing`
pools after publishing to Pulsar were more likely to hang.
After some painful production incidents, we arrived at the below
reproduction, which indicates an internal Pulsar client deadlock that occurs
when a global connection cache is cleared out, triggering destruction inside
the Pulsar-client C++ code.
**Note that this occurs on *any* invocation of `multiprocessing` in the
presence of a Pulsar connection that is cleared by an atfork hook, regardless
of whether the multiprocessing code, or the parent process, will ever use
Pulsar again**. This is highly unfortunate.
## Steps to Reproduce
1. Run a standalone pulsar broker on localhost:6650.
2. Put the below Python code in a `repro.py` file. Optionally, edit the test
topic name to use a topic/namespace/tenant that exist for you locally.
3. Do `while true; do python repro.py; done` to repeatedly invoke it.
4. Wait ten minutes.
Snippet:
```python
import time
from multiprocessing import Pool, set_start_method
import pulsar
import os
CLIENT = None
PROD = None
def do_send():
global CLIENT
global PROD
if PROD is None:
CLIENT = pulsar.Client(service_url='pulsar://localhost:6650')
PROD =
CLIENT.create_producer(topic='persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test')
PROD.send(b"123")
def clear_connection_cache():
global CLIENT
global PROD
print("Destroying connections", flush=True)
CLIENT = PROD = None
print("Destroyed connections", flush=True)
def main():
os.register_at_fork(after_in_child=clear_connection_cache)
do_send()
processes = 20
set_start_method('fork')
pool = Pool(
processes=processes,
)
try:
# Map_async with timeout is used here with get and a super large
timeout to get an interruptible
# AsyncResult object. This is because regular map() calls are not
interruptible.
pool.map_async(
time.sleep, [0.1 for _ in range(processes)]
).get(1)
finally:
pool.close()
print("Joining pool", flush=True)
pool.join()
print("Joined pool", flush=True)
if __name__ == '__main__':
main()
```
## Expected behavior
The code should continue successfully being invoked over and over after ten
minutes.
More to the point **all invocations of Pulsar client functionality,
including destruction, should be "safe" to perform in all contexts: forks,
threads, asyncio coroutines, trio coroutines, multiprocessing, Billiard,
`pthread_atfork` hooks, signal handlers, etc**. "Safe" might mean "works" or it
might mean "errors immediately saying 'this is not allowed'". But hanging
forever is not safe.
## Observed behavior
After some number of iterations, usually <1min, the code hangs in a
pool-join timeout.
When that happens, the number of `Destroying connections` log lines and
`Destroyed connections` log lines will not match up, indicating that one call
to `CLIENT = PROD = None` is blocking.
## Root cause
When the test code hangs, attaching a `py-spy` stack printer (with
`--threads --native`) to the hung subprocess indicates that it is stuck in what
appears to be code related to the client's destructor. The code in
`_after_fork_child` in the below stack is exactly analogous to the
`clear_connection_cache` code in the above snippet:
<img width="878" alt="image"
src="https://github.com/apache/pulsar-client-python/assets/10999109/6a05e4a2-cc55-4139-928b-3fc04bd34d07">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]