Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 4d7148a85 -> e0e882506


AMBARI-21616. Stomp connections leak on ambari-agent (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e0e88250
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e0e88250
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e0e88250

Branch: refs/heads/branch-3.0-perf
Commit: e0e882506437d83cda6b50861a4290afe09def17
Parents: 4d7148a
Author: Andrew Onishuk <[email protected]>
Authored: Mon Jul 31 16:15:11 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Mon Jul 31 16:15:11 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/HeartbeatThread.py | 11 ++++----
 .../python/ambari_agent/InitializerModule.py    | 11 ++++++--
 .../python/ambari_stomp/adapter/websocket.py    | 27 ++++++++++++++++----
 .../src/main/python/ambari_stomp/transport.py   |  8 ++++--
 4 files changed, 43 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index ab24bb4..85840d0 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -91,12 +91,13 @@ class HeartbeatThread(threading.Thread):
           logger.exception("Exception in HeartbeatThread. Re-running the 
registration")
 
         self.initializer_module.is_registered = False
-        try:
-          self.initializer_module.connection.disconnect()
-        except:
-          # if exception happened due to connection problem, disconnect might 
not work
-          pass
+
         if hasattr(self.initializer_module, '_connection'):
+          try:
+            self.initializer_module.connection.disconnect()
+          except:
+            logger.exception("Exception during 
self.initializer_module.connection.disconnect()")
+
           delattr(self.initializer_module, '_connection')
 
       self.stop_event.wait(self.heartbeat_interval)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index fb73d6d..6ef4a04 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -100,8 +100,15 @@ class InitializerModule:
     try:
       conn.start()
       conn.connect(wait=True)
-    except socket_error:
-      logger.warn("Could not connect to {0}".format(connection_url))
+    except Exception as ex:
+      try:
+        conn.disconnect()
+      except:
+        logger.exception("Exception during conn.disconnect()")
+
+      if isinstance(ex, socket_error):
+        logger.warn("Could not connect to {0}".format(connection_url))
+
       raise
 
     return conn

http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py 
b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index e672a70..220f399 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -104,9 +104,20 @@ class WsTransport(Transport):
 
   def stop(self):
     self.running = False
-    self.ws.close_connection()
-    self.disconnect_socket()
-    Transport.stop(self)
+    try:
+      self.ws.close_connection()
+    except:
+      logger.exception("Exception during self.ws.close_connection()")
+
+    try:
+      self.disconnect_socket()
+    except:
+      logger.exception("Exception during self.disconnect_socket()")
+
+    try:
+      Transport.stop(self)
+    except:
+      logger.exception("Exception during Transport.stop(self)")
 
 class WsConnection(BaseConnection, Protocol12):
   def __init__(self, url):
@@ -116,8 +127,14 @@ class WsConnection(BaseConnection, Protocol12):
     Protocol12.__init__(self, self.transport, (0, 0))
 
   def disconnect(self, receipt=None, headers=None, **keyword_headers):
-    Protocol12.disconnect(self, receipt, headers, **keyword_headers)
-    self.transport.stop()
+    try:
+      Protocol12.disconnect(self, receipt, headers, **keyword_headers)
+    except:
+      logger.exception("Exception during Protocol12.disconnect()")
+    try:
+      self.transport.stop()
+    except:
+      logger.exception("Exception during self.transport.stop()")
 
 class ConnectionResponseTimeout(StompException):
   """

http://git-wip-us.apache.org/repos/asf/ambari/blob/e0e88250/ambari-common/src/main/python/ambari_stomp/transport.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py 
b/ambari-common/src/main/python/ambari_stomp/transport.py
index d497d6a..32604fc 100644
--- a/ambari-common/src/main/python/ambari_stomp/transport.py
+++ b/ambari-common/src/main/python/ambari_stomp/transport.py
@@ -71,6 +71,7 @@ class BaseTransport(ambari_stomp.listener.Publisher):
         self.connection_error = False
         self.__receipts = {}
         self.current_host_and_port = None
+        self.receiver_thread = None
 
         # flag used when we receive the disconnect receipt
         self.__disconnect_receipt = None
@@ -106,8 +107,8 @@ class BaseTransport(ambari_stomp.listener.Publisher):
         """
         self.running = True
         self.attempt_connection()
-        receiver_thread = self.create_thread_fc(self.__receiver_loop)
-        receiver_thread.name = "StompReceiver%s" % getattr(receiver_thread, 
"name", "Thread")
+        self.receiver_thread = self.create_thread_fc(self.__receiver_loop)
+        self.receiver_thread.name = "StompReceiver%s" % 
getattr(self.receiver_thread, "name", "Thread")
         self.notify('connecting')
 
     def stop(self):
@@ -115,6 +116,9 @@ class BaseTransport(ambari_stomp.listener.Publisher):
         Stop the connection. Performs a clean shutdown by waiting for the
         receiver thread to exit.
         """
+        if not self.receiver_thread or not self.receiver_thread.is_alive():
+          return
+
         with self.__receiver_thread_exit_condition:
             while not self.__receiver_thread_exited:
                 self.__receiver_thread_exit_condition.wait()

Reply via email to