gromsterus commented on issue #52:
URL: 
https://github.com/apache/pulsar-client-python/issues/52#issuecomment-1331889157

   Hi, @whisust. Gunicorn fork main process on worker init, because of this, 
your approach won't work :(
   
   You could use [Flask Extensions 
Pattern](https://flask.palletsprojects.com/en/2.2.x/extensiondev/) & [Gunicorn 
Server Hooks](https://docs.gunicorn.org/en/stable/settings.html#server-hooks) 
together
   ```python
   # test_pulsar.py
   import logging
   import uuid
   from datetime import datetime
   from typing import Optional
   
   from _pulsar import Result
   from flask import Flask, make_response
   from pulsar import Client, MessageId, Producer, schema
   
   
   logging.basicConfig(
       level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] 
%(message)s'
   )
   logger = logging.getLogger(__name__)
   pulsar_logger = logging.getLogger('pulsar')
   
   
   class PulsarExt:
       def __init__(self, url: str) -> None:
           self._url = url
           self._client: Optional[Client] = None
           self._g_producer: Optional[Producer] = None
   
       def init_app(self, app_: Flask) -> None:
           app_.extensions['pulsar'] = self
   
       @property
       def client(self) -> Client:
           assert self._client is not None, 'Call `connect()` first'
           return self._client
   
       @property
       def g_producer(self) -> Producer:
           assert self._g_producer is not None, 'Call `connect()` first'
           return self._g_producer
   
       def connect(self) -> None:
           self._client = Client(self._url, authentication=None, 
logger=pulsar_logger)
           self._g_producer = self._client.create_producer(
               'non-persistent://public/default/test-gunicorn',
               producer_name=f'my-producer-{uuid.uuid4()}',
               schema=schema.StringSchema(),
           )
   
       def close(self) -> None:
           if self._g_producer:
               self._g_producer.close()
           if self._client:
               self._client.close()
   
           self._g_producer = None
           self._client = None
   
   
   app = Flask(__name__)
   pulsar_ext = PulsarExt('pulsar://pulsar:6650')
   pulsar_ext.init_app(app)
   
   
   def init_app() -> None:
       pulsar_ext.connect()
   
   
   def teardown_app() -> None:
       pulsar_ext.close()
   
   
   @app.post('/post-message')
   def post_pulsar_message():
       logger.info(
           'Calling producer.send_async now, '
           'in the next lines there should be the callback result'
       )
       dt = datetime.now()
       pulsar_ext.g_producer.send_async(content=f'dt={dt.isoformat()}', 
callback=callback)
       logger.info('After producer.send_async, returning the http response')
       return '', 201
   
   
   def callback(res: Result, _msg_id: MessageId):
       logger.info(f'Callback result here! Event acknowledged by the broker.')
   
   
   @app.get('/')
   def healthcheck():
       logger.info('API Running fine')
       return make_response({'status': 'healthy'})
   
   
   # gunicorn_conf.py
   from case_fixed import init_app, teardown_app
   
   
   def post_worker_init(worker):
       init_app()
   
   
   def worker_int(worker):
       teardown_app()
   
   
   def worker_abort(worker):
       teardown_app()
   
   ```
   And start webserver with
   ```sh
   gunicorn test_pulsar:app  --config gunicorn_conf.py --workers=2 --preload 
   ```
   
   Now client and producer are created after the fork and init of a worker


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to