Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf d188fc3cd -> 4d7148a85


AMBARI-21574. Agent hangs when server is restarted during connection stage 
(aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4d7148a8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4d7148a8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4d7148a8

Branch: refs/heads/branch-3.0-perf
Commit: 4d7148a85ffb8f3c27ca70d79350823aaa563a2f
Parents: d188fc3
Author: Andrew Onishuk <[email protected]>
Authored: Mon Jul 31 15:33:16 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Mon Jul 31 15:33:16 2017 +0300

----------------------------------------------------------------------
 .../python/ambari_stomp/adapter/websocket.py     | 19 +++++++++++++++++++
 .../src/main/python/ambari_stomp/transport.py    |  3 +++
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7148a8/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py 
b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index 91eaaac..e672a70 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -24,11 +24,14 @@ from Queue import Queue
 from ambari_stomp.connect import BaseConnection
 from ambari_stomp.protocol import Protocol12
 from ambari_stomp.transport import Transport, DEFAULT_SSL_VERSION
+from ambari_stomp.exception import StompException
 
 from ambari_ws4py.client.threadedclient import WebSocketClient
 
 logger = logging.getLogger(__name__)
 
+DEFAULT_CONNECTION_TIMEOUT = 10
+
 class QueuedWebSocketClient(WebSocketClient):
   def __init__(self, *args, **kwargs):
     WebSocketClient.__init__(self, *args, **kwargs)
@@ -68,6 +71,17 @@ class WsTransport(Transport):
     self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat'])
     self.ws.daemon = False
 
+  def wait_for_connection(self, timeout=DEFAULT_CONNECTION_TIMEOUT):
+    """
+    Wait until we've established a connection with the server.
+
+    :param float timeout: how long to wait, in seconds
+    """
+    with self.get_connect_wait_condition():
+      self.get_connect_wait_condition().wait(timeout)
+      if not self.is_connected() and not self.connection_error:
+        raise ConnectionResponseTimeout("Waiting for connection confirmation 
timed out")
+
   def is_connected(self):
     return self.connected
 
@@ -104,3 +118,8 @@ class WsConnection(BaseConnection, Protocol12):
   def disconnect(self, receipt=None, headers=None, **keyword_headers):
     Protocol12.disconnect(self, receipt, headers, **keyword_headers)
     self.transport.stop()
+
+class ConnectionResponseTimeout(StompException):
+  """
+  Raised when sent 'STOMP' frame and have not received 'CONNECTED' a certain 
timeout.
+  """

http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7148a8/ambari-common/src/main/python/ambari_stomp/transport.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py 
b/ambari-common/src/main/python/ambari_stomp/transport.py
index 346cb17..d497d6a 100644
--- a/ambari-common/src/main/python/ambari_stomp/transport.py
+++ b/ambari-common/src/main/python/ambari_stomp/transport.py
@@ -301,6 +301,9 @@ class BaseTransport(ambari_stomp.listener.Publisher):
         Disconnect the socket.
         """
 
+    def get_connect_wait_condition(self):
+      return self.__connect_wait_condition
+
     def wait_for_connection(self, timeout=None):
         """
         Wait until we've established a connection with the server.

Reply via email to