Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 4d7148a85 -> e0e882506
AMBARI-21616. Stomp connections leak on ambari-agent (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e0e88250 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e0e88250 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e0e88250 Branch: refs/heads/branch-3.0-perf Commit: e0e882506437d83cda6b50861a4290afe09def17 Parents: 4d7148a Author: Andrew Onishuk <[email protected]> Authored: Mon Jul 31 16:15:11 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Mon Jul 31 16:15:11 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/HeartbeatThread.py | 11 ++++---- .../python/ambari_agent/InitializerModule.py | 11 ++++++-- .../python/ambari_stomp/adapter/websocket.py | 27 ++++++++++++++++---- .../src/main/python/ambari_stomp/transport.py | 8 ++++-- 4 files changed, 43 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index ab24bb4..85840d0 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -91,12 +91,13 @@ class HeartbeatThread(threading.Thread): logger.exception("Exception in HeartbeatThread. Re-running the registration") self.initializer_module.is_registered = False - try: - self.initializer_module.connection.disconnect() - except: - # if exception happened due to connection problem, disconnect might not work - pass + if hasattr(self.initializer_module, '_connection'): + try: + self.initializer_module.connection.disconnect() + except: + logger.exception("Exception during self.initializer_module.connection.disconnect()") + delattr(self.initializer_module, '_connection') self.stop_event.wait(self.heartbeat_interval) http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index fb73d6d..6ef4a04 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -100,8 +100,15 @@ class InitializerModule: try: conn.start() conn.connect(wait=True) - except socket_error: - logger.warn("Could not connect to {0}".format(connection_url)) + except Exception as ex: + try: + conn.disconnect() + except: + logger.exception("Exception during conn.disconnect()") + + if isinstance(ex, socket_error): + logger.warn("Could not connect to {0}".format(connection_url)) + raise return conn http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index e672a70..220f399 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -104,9 +104,20 @@ class WsTransport(Transport): def stop(self): self.running = False - self.ws.close_connection() - self.disconnect_socket() - Transport.stop(self) + try: + self.ws.close_connection() + except: + logger.exception("Exception during self.ws.close_connection()") + + try: + self.disconnect_socket() + except: + logger.exception("Exception during self.disconnect_socket()") + + try: + Transport.stop(self) + except: + logger.exception("Exception during Transport.stop(self)") class WsConnection(BaseConnection, Protocol12): def __init__(self, url): @@ -116,8 +127,14 @@ class WsConnection(BaseConnection, Protocol12): Protocol12.__init__(self, self.transport, (0, 0)) def disconnect(self, receipt=None, headers=None, **keyword_headers): - Protocol12.disconnect(self, receipt, headers, **keyword_headers) - self.transport.stop() + try: + Protocol12.disconnect(self, receipt, headers, **keyword_headers) + except: + logger.exception("Exception during Protocol12.disconnect()") + try: + self.transport.stop() + except: + logger.exception("Exception during self.transport.stop()") class ConnectionResponseTimeout(StompException): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-common/src/main/python/ambari_stomp/transport.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py b/ambari-common/src/main/python/ambari_stomp/transport.py index d497d6a..32604fc 100644 --- a/ambari-common/src/main/python/ambari_stomp/transport.py +++ b/ambari-common/src/main/python/ambari_stomp/transport.py @@ -71,6 +71,7 @@ class BaseTransport(ambari_stomp.listener.Publisher): self.connection_error = False self.__receipts = {} self.current_host_and_port = None + self.receiver_thread = None # flag used when we receive the disconnect receipt self.__disconnect_receipt = None @@ -106,8 +107,8 @@ class BaseTransport(ambari_stomp.listener.Publisher): """ self.running = True self.attempt_connection() - receiver_thread = self.create_thread_fc(self.__receiver_loop) - receiver_thread.name = "StompReceiver%s" % getattr(receiver_thread, "name", "Thread") + self.receiver_thread = self.create_thread_fc(self.__receiver_loop) + self.receiver_thread.name = "StompReceiver%s" % getattr(self.receiver_thread, "name", "Thread") self.notify('connecting') def stop(self): @@ -115,6 +116,9 @@ class BaseTransport(ambari_stomp.listener.Publisher): Stop the connection. Performs a clean shutdown by waiting for the receiver thread to exit. """ + if not self.receiver_thread or not self.receiver_thread.is_alive(): + return + with self.__receiver_thread_exit_condition: while not self.__receiver_thread_exited: self.__receiver_thread_exit_condition.wait()
