[ 
https://issues.apache.org/jira/browse/PROTON-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Husbyn updated PROTON-2306:
----------------------------------
    Description: 
Hi,

We currently have a simple service that consumes messages from Azure Service 
Bus (indefinitely), and based on the contents of those messages, either ignores 
it or does a POST request to another internal service endpoint for our app. 

However, occasionally we get the following exception:
{quote}The connection was inactive for more than the allowed 240000 
milliseconds and is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.
{quote}
This is then shortly followed by:
{quote}amqp:connection:framing-error: SSL Failure: Unknown error
{quote}
I've observed that this error happens pretty much exactly 4 minutes (i.e. 
240000 ms) after the last time the service processed an incoming message. This 
suggests to me that it somehow after processing a message, suddenly stops 
sending heartbeats to the Azure Service Bus to keep the connection open.

 

Any help or pointers in the right direction here would be much appreciated. 
I've been looking into this for several days without really getting anywhere.

 

Below is what I believe to be the relevant code:

 
{code:java}
class TriggerConsumer(MessagingHandler):
    def __init__(
        self,
        server: str,
        address: str,
        event_filter,
    ):
        super().__init__()
        self._server = server
        self._address = address
        self._event_filter = event_filter
    def on_start(self, event: Event):
        """Called when the event loop starts. See superclass docs for a (tiny) 
bit more info.
        We use it here to 'set up' our event receiver from the queue.
        """
        logging.info('Starting consumer...')
        # Source: 
https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
        conn = event.container.connect(self._server, allowed_mechs='PLAIN')
        event.container.create_receiver(conn, self._address)
        logging.info('Started consumer')
        health_check.set_ready()

    def on_message(self, event: Event):
        logging.info('Receiving message')
        message_id = None
        try:
            message_id = event.message.id
            message_content = json.loads(event.message.body)
            try:
                self._event_filter.check_event(
                    event_body=message_content, 
last_processed_event_time=self._last_processing_time
                )
            except FilterError as e:
                logging.info(f"Event with id '{message_id}' rejected by event 
filter: {e}")
                return

            # Do POST call to separate service in the cluster here
            do_post_call()

            logging.info(f'Successfully processed event with id {message_id}')
        # Message is not acked if we raise an exception, so to avoid a vicious 
loop of crashing on the same
        # message we catch all exceptions here.
        except Exception as e:
            logging.error(f'Failed to process event with id {message_id}: {e}')
            logging.exception(e)
            return

if __name__ == '__main__':
    logging.info('Starting service')
    # Spawns a separate thread with a simple HTTP server exposing health 
monitoring endpoints to Docker
    start_http_health_check_endpoint()
    consumer = TriggerConsumer(
        # AMQP connection string, usually in the form of 
'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
        server=Config.AMQP_SERVER_CONN,
        address=Config.QUEUE_NAME,
        event_filter=EventFilter()
    )
    Container(consumer).run()
{code}

  was:
Hi,

We currently have a simple service that consumes messages from Azure Service 
Bus (indefinitely), and based on the contents of those messages, either ignores 
it or does a POST request to another internal service endpoint for our app. 

However, occasionally we get the following exception:

> The connection was inactive for more than the allowed 240000 milliseconds and 
>is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.

This is then shortly followed by:

> amqp:connection:framing-error: SSL Failure: Unknown error

I've observed that this error happens pretty much exactly 4 minutes (i.e. 
240000 ms) after the last time the service processed an incoming message. This 
suggests to me that it somehow after processing a message, suddenly stops 
sending heartbeats to the Azure Service Bus to keep the connection open.

 

Any help or pointers in the right direction here would be much appreciated. 
I've been looking into this for several days without really getting anywhere.

 

Below is what I believe to be the relevant code:

 
{code:java}
class TriggerConsumer(MessagingHandler):
    def __init__(
        self,
        server: str,
        address: str,
        event_filter,
    ):
        super().__init__()
        self._server = server
        self._address = address
        self._event_filter = event_filter
    def on_start(self, event: Event):
        """Called when the event loop starts. See superclass docs for a (tiny) 
bit more info.
        We use it here to 'set up' our event receiver from the queue.
        """
        logging.info('Starting consumer...')
        # Source: 
https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
        conn = event.container.connect(self._server, allowed_mechs='PLAIN')
        event.container.create_receiver(conn, self._address)
        logging.info('Started consumer')
        health_check.set_ready()

    def on_message(self, event: Event):
        logging.info('Receiving message')
        message_id = None
        try:
            message_id = event.message.id
            message_content = json.loads(event.message.body)
            try:
                self._event_filter.check_event(
                    event_body=message_content, 
last_processed_event_time=self._last_processing_time
                )
            except FilterError as e:
                logging.info(f"Event with id '{message_id}' rejected by event 
filter: {e}")
                return

            # Do POST call to separate service in the cluster here
            do_post_call()

            logging.info(f'Successfully processed event with id {message_id}')
        # Message is not acked if we raise an exception, so to avoid a vicious 
loop of crashing on the same
        # message we catch all exceptions here.
        except Exception as e:
            logging.error(f'Failed to process event with id {message_id}: {e}')
            logging.exception(e)
            return

if __name__ == '__main__':
    logging.info('Starting service')
    # Spawns a separate thread with a simple HTTP server exposing health 
monitoring endpoints to Docker
    start_http_health_check_endpoint()
    consumer = TriggerConsumer(
        # AMQP connection string, usually in the form of 
'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
        server=Config.AMQP_SERVER_CONN,
        address=Config.QUEUE_NAME,
        event_filter=EventFilter()
    )
    Container(consumer).run()
{code}


> Getting connection inactive error when consuming messages from Azure Service 
> Bus
> --------------------------------------------------------------------------------
>
>                 Key: PROTON-2306
>                 URL: https://issues.apache.org/jira/browse/PROTON-2306
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: python-binding
>    Affects Versions: proton-c-0.31.0
>         Environment: The service runs as 2 instances (pods) in a Kubernetes 
> cluster hosted on Azure Kubernetes Service.
>            Reporter: Martin Husbyn
>            Priority: Minor
>
> Hi,
> We currently have a simple service that consumes messages from Azure Service 
> Bus (indefinitely), and based on the contents of those messages, either 
> ignores it or does a POST request to another internal service endpoint for 
> our app. 
> However, occasionally we get the following exception:
> {quote}The connection was inactive for more than the allowed 240000 
> milliseconds and is closed by container 
> '5942ad2288f8495aaea81fe91c935445_G10'.
> {quote}
> This is then shortly followed by:
> {quote}amqp:connection:framing-error: SSL Failure: Unknown error
> {quote}
> I've observed that this error happens pretty much exactly 4 minutes (i.e. 
> 240000 ms) after the last time the service processed an incoming message. 
> This suggests to me that it somehow after processing a message, suddenly 
> stops sending heartbeats to the Azure Service Bus to keep the connection open.
>  
> Any help or pointers in the right direction here would be much appreciated. 
> I've been looking into this for several days without really getting anywhere.
>  
> Below is what I believe to be the relevant code:
>  
> {code:java}
> class TriggerConsumer(MessagingHandler):
>     def __init__(
>         self,
>         server: str,
>         address: str,
>         event_filter,
>     ):
>         super().__init__()
>         self._server = server
>         self._address = address
>         self._event_filter = event_filter
>     def on_start(self, event: Event):
>         """Called when the event loop starts. See superclass docs for a 
> (tiny) bit more info.
>         We use it here to 'set up' our event receiver from the queue.
>         """
>         logging.info('Starting consumer...')
>         # Source: 
> https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
>         conn = event.container.connect(self._server, allowed_mechs='PLAIN')
>         event.container.create_receiver(conn, self._address)
>         logging.info('Started consumer')
>         health_check.set_ready()
>     def on_message(self, event: Event):
>         logging.info('Receiving message')
>         message_id = None
>         try:
>             message_id = event.message.id
>             message_content = json.loads(event.message.body)
>             try:
>                 self._event_filter.check_event(
>                     event_body=message_content, 
> last_processed_event_time=self._last_processing_time
>                 )
>             except FilterError as e:
>                 logging.info(f"Event with id '{message_id}' rejected by event 
> filter: {e}")
>                 return
>             # Do POST call to separate service in the cluster here
>             do_post_call()
>             logging.info(f'Successfully processed event with id {message_id}')
>         # Message is not acked if we raise an exception, so to avoid a 
> vicious loop of crashing on the same
>         # message we catch all exceptions here.
>         except Exception as e:
>             logging.error(f'Failed to process event with id {message_id}: 
> {e}')
>             logging.exception(e)
>             return
> if __name__ == '__main__':
>     logging.info('Starting service')
>     # Spawns a separate thread with a simple HTTP server exposing health 
> monitoring endpoints to Docker
>     start_http_health_check_endpoint()
>     consumer = TriggerConsumer(
>         # AMQP connection string, usually in the form of 
> 'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
>         server=Config.AMQP_SERVER_CONN,
>         address=Config.QUEUE_NAME,
>         event_filter=EventFilter()
>     )
>     Container(consumer).run()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to