zbentley opened a new issue #14588:
URL: https://github.com/apache/pulsar/issues/14588


   **Describe the bug**
   In Python, I have to reconnect Client objects a lot. Usually due to bugs. 
`client.close` and `client.shutdown` are sufficient to disconnect a client 
object. However, a disconnected client object can't be reconnected, so instead 
I re-instantiate a brand new Client object to replace it.
   
   While doing this, some of my production services started crashing due to 
file descriptor exhaustion.
   
   It turns out that, if you have called `.subscribe` at least once, closing a 
consumer object and closing a client do not close all file handles opened on 
the system. Those handles (a fair number of them--one or two per worker thread 
per partition, it looks like) leak, consuming resources and putting the process 
closer to exhaustion.
   
   This *seems* to only happen if:
   - The topic is partitioned.
   - The topic is not automatically created by the subscription, and is 
pre-existing.
   
   
   
   **To Reproduce**
   1. Create a partitioned, persistent topic with at least 2 partitions.
   2. Update the below code to use the name of the topic you created.
   3. Run the below code.
   4. Observe the filehandle diff printed. Observe that the number of open 
filehandles on the system grows over time.
   
   **Expected behavior**
   - Creating then `.close`ing a `Consumer` object should result in net zero 
file handle changes on the system.
   - Creating then `.close`ing a `Producer` object should result in net zero 
file handle changes on the system.
   - Creating then `close`ing a `Client` object should result in net zero file 
handle changes on the system.
   
   **Code to reproduce**
   
   ```python
   from contextlib import contextmanager
   
   import logging
   from pulsar import Client, ConsumerType
   
   TOPIC_NAME = "persistent://your topic name here"
   
   def get_open_filehandles():
       import os
       import subprocess
       lines = subprocess.run(['lsof', '-p', str(os.getpid())], 
capture_output=True)
       return sorted(lines.stdout.decode().split('\n'))
   
   
   @contextmanager
   def diff_file_handles():
       initial = set(get_open_filehandles())
       try:
           yield
       finally:
           final = set(get_open_filehandles())
           for fh in final:
               if fh not in initial:
                   print("NEW FILEHANDLE", fh)
           for fh in initial:
               if fh not in final:
                   print("CLOSED FILEHANDL", fh)
           print("Handles before:", len(initial), "Handles after:", len(final))
   
   
   def consume_and_toss():
       client = Client(
           service_url='pulsar://localhost',
           logger=logging.getLogger(),
           io_threads=1,
           message_listener_threads=1,
       )
       sub = client.subscribe(
           topic=TOPIC_NAME,
           subscription_name='testsub',
           receiver_queue_size=1,
           max_total_receiver_queue_size_across_partitions=1,
           consumer_type=ConsumerType.KeyShared,
           replicate_subscription_state_enabled=False,
   
       )
       sub.close()
       del sub
       client.shutdown()
       client.close()
       del client
   
   
   def main():
       # Prime caches, load dylibs:
       consume_and_toss()
   
       for _ in range(4):
           print("ITERATING")
           with diff_file_handles():
               consume_and_toss()
   
   if __name__ == '__main__':
       main()
   ```
   
   **Desktop (please complete the following information):**
    - OS: MacOS monterey.
    - Pulsar: 2.9.1 via Homebrew. 
    - Client: 2.9.1.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to