[
https://issues.apache.org/jira/browse/PROTON-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martin Husbyn updated PROTON-2306:
----------------------------------
Description:
Hi,
We currently have a simple service that consumes messages from Azure Service
Bus (indefinitely), and based on the contents of those messages, either ignores
it or does a POST request to another internal service endpoint for our app.
However, occasionally we get the following exception:
{quote}The connection was inactive for more than the allowed 240000
milliseconds and is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.
{quote}
This is then shortly followed by:
{quote}amqp:connection:framing-error: SSL Failure: Unknown error
{quote}
I've observed that this error happens pretty much exactly 4 minutes (i.e.
240000 ms) after the last time the service processed an incoming message. This
suggests to me that it somehow after processing a message, suddenly stops
sending heartbeats to the Azure Service Bus to keep the connection open.
Any help or pointers in the right direction here would be much appreciated.
I've been looking into this for several days without really getting anywhere.
Below is what I believe to be the relevant code:
{code:java}
class TriggerConsumer(MessagingHandler):
def __init__(
self,
server: str,
address: str,
event_filter,
):
super().__init__()
self._server = server
self._address = address
self._event_filter = event_filter
def on_start(self, event: Event):
"""Called when the event loop starts. See superclass docs for a (tiny)
bit more info.
We use it here to 'set up' our event receiver from the queue.
"""
logging.info('Starting consumer...')
# Source:
https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
conn = event.container.connect(self._server, allowed_mechs='PLAIN')
event.container.create_receiver(conn, self._address)
logging.info('Started consumer')
health_check.set_ready()
def on_message(self, event: Event):
logging.info('Receiving message')
message_id = None
try:
message_id = event.message.id
message_content = json.loads(event.message.body)
try:
self._event_filter.check_event(
event_body=message_content,
last_processed_event_time=self._last_processing_time
)
except FilterError as e:
logging.info(f"Event with id '{message_id}' rejected by event
filter: {e}")
return
# Do POST call to separate service in the cluster here
do_post_call()
logging.info(f'Successfully processed event with id {message_id}')
# Message is not acked if we raise an exception, so to avoid a vicious
loop of crashing on the same
# message we catch all exceptions here.
except Exception as e:
logging.error(f'Failed to process event with id {message_id}: {e}')
logging.exception(e)
return
if __name__ == '__main__':
logging.info('Starting service')
# Spawns a separate thread with a simple HTTP server exposing health
monitoring endpoints to Docker
start_http_health_check_endpoint()
consumer = TriggerConsumer(
# AMQP connection string, usually in the form of
'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
server=Config.AMQP_SERVER_CONN,
address=Config.QUEUE_NAME,
event_filter=EventFilter()
)
Container(consumer).run()
{code}
was:
Hi,
We currently have a simple service that consumes messages from Azure Service
Bus (indefinitely), and based on the contents of those messages, either ignores
it or does a POST request to another internal service endpoint for our app.
However, occasionally we get the following exception:
> The connection was inactive for more than the allowed 240000 milliseconds and
>is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.
This is then shortly followed by:
> amqp:connection:framing-error: SSL Failure: Unknown error
I've observed that this error happens pretty much exactly 4 minutes (i.e.
240000 ms) after the last time the service processed an incoming message. This
suggests to me that it somehow after processing a message, suddenly stops
sending heartbeats to the Azure Service Bus to keep the connection open.
Any help or pointers in the right direction here would be much appreciated.
I've been looking into this for several days without really getting anywhere.
Below is what I believe to be the relevant code:
{code:java}
class TriggerConsumer(MessagingHandler):
def __init__(
self,
server: str,
address: str,
event_filter,
):
super().__init__()
self._server = server
self._address = address
self._event_filter = event_filter
def on_start(self, event: Event):
"""Called when the event loop starts. See superclass docs for a (tiny)
bit more info.
We use it here to 'set up' our event receiver from the queue.
"""
logging.info('Starting consumer...')
# Source:
https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
conn = event.container.connect(self._server, allowed_mechs='PLAIN')
event.container.create_receiver(conn, self._address)
logging.info('Started consumer')
health_check.set_ready()
def on_message(self, event: Event):
logging.info('Receiving message')
message_id = None
try:
message_id = event.message.id
message_content = json.loads(event.message.body)
try:
self._event_filter.check_event(
event_body=message_content,
last_processed_event_time=self._last_processing_time
)
except FilterError as e:
logging.info(f"Event with id '{message_id}' rejected by event
filter: {e}")
return
# Do POST call to separate service in the cluster here
do_post_call()
logging.info(f'Successfully processed event with id {message_id}')
# Message is not acked if we raise an exception, so to avoid a vicious
loop of crashing on the same
# message we catch all exceptions here.
except Exception as e:
logging.error(f'Failed to process event with id {message_id}: {e}')
logging.exception(e)
return
if __name__ == '__main__':
logging.info('Starting service')
# Spawns a separate thread with a simple HTTP server exposing health
monitoring endpoints to Docker
start_http_health_check_endpoint()
consumer = TriggerConsumer(
# AMQP connection string, usually in the form of
'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
server=Config.AMQP_SERVER_CONN,
address=Config.QUEUE_NAME,
event_filter=EventFilter()
)
Container(consumer).run()
{code}
> Getting connection inactive error when consuming messages from Azure Service
> Bus
> --------------------------------------------------------------------------------
>
> Key: PROTON-2306
> URL: https://issues.apache.org/jira/browse/PROTON-2306
> Project: Qpid Proton
> Issue Type: Bug
> Components: python-binding
> Affects Versions: proton-c-0.31.0
> Environment: The service runs as 2 instances (pods) in a Kubernetes
> cluster hosted on Azure Kubernetes Service.
> Reporter: Martin Husbyn
> Priority: Minor
>
> Hi,
> We currently have a simple service that consumes messages from Azure Service
> Bus (indefinitely), and based on the contents of those messages, either
> ignores it or does a POST request to another internal service endpoint for
> our app.
> However, occasionally we get the following exception:
> {quote}The connection was inactive for more than the allowed 240000
> milliseconds and is closed by container
> '5942ad2288f8495aaea81fe91c935445_G10'.
> {quote}
> This is then shortly followed by:
> {quote}amqp:connection:framing-error: SSL Failure: Unknown error
> {quote}
> I've observed that this error happens pretty much exactly 4 minutes (i.e.
> 240000 ms) after the last time the service processed an incoming message.
> This suggests to me that it somehow after processing a message, suddenly
> stops sending heartbeats to the Azure Service Bus to keep the connection open.
>
> Any help or pointers in the right direction here would be much appreciated.
> I've been looking into this for several days without really getting anywhere.
>
> Below is what I believe to be the relevant code:
>
> {code:java}
> class TriggerConsumer(MessagingHandler):
> def __init__(
> self,
> server: str,
> address: str,
> event_filter,
> ):
> super().__init__()
> self._server = server
> self._address = address
> self._event_filter = event_filter
> def on_start(self, event: Event):
> """Called when the event loop starts. See superclass docs for a
> (tiny) bit more info.
> We use it here to 'set up' our event receiver from the queue.
> """
> logging.info('Starting consumer...')
> # Source:
> https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
> conn = event.container.connect(self._server, allowed_mechs='PLAIN')
> event.container.create_receiver(conn, self._address)
> logging.info('Started consumer')
> health_check.set_ready()
> def on_message(self, event: Event):
> logging.info('Receiving message')
> message_id = None
> try:
> message_id = event.message.id
> message_content = json.loads(event.message.body)
> try:
> self._event_filter.check_event(
> event_body=message_content,
> last_processed_event_time=self._last_processing_time
> )
> except FilterError as e:
> logging.info(f"Event with id '{message_id}' rejected by event
> filter: {e}")
> return
> # Do POST call to separate service in the cluster here
> do_post_call()
> logging.info(f'Successfully processed event with id {message_id}')
> # Message is not acked if we raise an exception, so to avoid a
> vicious loop of crashing on the same
> # message we catch all exceptions here.
> except Exception as e:
> logging.error(f'Failed to process event with id {message_id}:
> {e}')
> logging.exception(e)
> return
> if __name__ == '__main__':
> logging.info('Starting service')
> # Spawns a separate thread with a simple HTTP server exposing health
> monitoring endpoints to Docker
> start_http_health_check_endpoint()
> consumer = TriggerConsumer(
> # AMQP connection string, usually in the form of
> 'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
> server=Config.AMQP_SERVER_CONN,
> address=Config.QUEUE_NAME,
> event_filter=EventFilter()
> )
> Container(consumer).run()
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]