Repository: ambari Updated Branches: refs/heads/branch-3.0-perf d188fc3cd -> 4d7148a85
AMBARI-21574. Agent hangs when server is restarted during connection stage (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4d7148a8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4d7148a8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4d7148a8 Branch: refs/heads/branch-3.0-perf Commit: 4d7148a85ffb8f3c27ca70d79350823aaa563a2f Parents: d188fc3 Author: Andrew Onishuk <[email protected]> Authored: Mon Jul 31 15:33:16 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Mon Jul 31 15:33:16 2017 +0300 ---------------------------------------------------------------------- .../python/ambari_stomp/adapter/websocket.py | 19 +++++++++++++++++++ .../src/main/python/ambari_stomp/transport.py | 3 +++ 2 files changed, 22 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7148a8/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index 91eaaac..e672a70 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -24,11 +24,14 @@ from Queue import Queue from ambari_stomp.connect import BaseConnection from ambari_stomp.protocol import Protocol12 from ambari_stomp.transport import Transport, DEFAULT_SSL_VERSION +from ambari_stomp.exception import StompException from ambari_ws4py.client.threadedclient import WebSocketClient logger = logging.getLogger(__name__) +DEFAULT_CONNECTION_TIMEOUT = 10 + class QueuedWebSocketClient(WebSocketClient): def __init__(self, *args, **kwargs): WebSocketClient.__init__(self, *args, **kwargs) @@ -68,6 +71,17 @@ class WsTransport(Transport): self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat']) self.ws.daemon = False + def wait_for_connection(self, timeout=DEFAULT_CONNECTION_TIMEOUT): + """ + Wait until we've established a connection with the server. + + :param float timeout: how long to wait, in seconds + """ + with self.get_connect_wait_condition(): + self.get_connect_wait_condition().wait(timeout) + if not self.is_connected() and not self.connection_error: + raise ConnectionResponseTimeout("Waiting for connection confirmation timed out") + def is_connected(self): return self.connected @@ -104,3 +118,8 @@ class WsConnection(BaseConnection, Protocol12): def disconnect(self, receipt=None, headers=None, **keyword_headers): Protocol12.disconnect(self, receipt, headers, **keyword_headers) self.transport.stop() + +class ConnectionResponseTimeout(StompException): + """ + Raised when sent 'STOMP' frame and have not received 'CONNECTED' a certain timeout. + """ http://git-wip-us.apache.org/repos/asf/ambari/blob/4d7148a8/ambari-common/src/main/python/ambari_stomp/transport.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py b/ambari-common/src/main/python/ambari_stomp/transport.py index 346cb17..d497d6a 100644 --- a/ambari-common/src/main/python/ambari_stomp/transport.py +++ b/ambari-common/src/main/python/ambari_stomp/transport.py @@ -301,6 +301,9 @@ class BaseTransport(ambari_stomp.listener.Publisher): Disconnect the socket. """ + def get_connect_wait_condition(self): + return self.__connect_wait_condition + def wait_for_connection(self, timeout=None): """ Wait until we've established a connection with the server.
