This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new d948ff0 AMBARI-23446. Fix connection drop on ambari-agent by locking
the write code (aonishuk)
d948ff0 is described below
commit d948ff082cced05695a2d8dbed2c15c5e19556de
Author: Andrew Onishuk <[email protected]>
AuthorDate: Wed Apr 4 12:44:01 2018 +0300
AMBARI-23446. Fix connection drop on ambari-agent by locking the write code
(aonishuk)
---
.../src/main/python/ambari_ws4py/websocket.py | 50 +++++++++++++++-------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py
b/ambari-common/src/main/python/ambari_ws4py/websocket.py
index dbaa2b6..936f333 100644
--- a/ambari-common/src/main/python/ambari_ws4py/websocket.py
+++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py
@@ -144,6 +144,11 @@ class WebSocket(object):
self._local_address = None
self._peer_address = None
+ self.lock = threading.Lock()
+
+ "Used to signal that the server side should be shutting down"
+ self.server_terminate_request = False
+
@property
def local_address(self):
"""
@@ -187,12 +192,17 @@ class WebSocket(object):
.. seealso:: Defined Status Codes
http://tools.ietf.org/html/rfc6455#section-7.4.1
"""
- if not self.server_terminated:
- self.server_terminated = True
- try:
- self._write(self.stream.close(code=code,
reason=reason).single(mask=self.stream.always_mask))
- except Exception as ex:
- logger.error("Error when terminating the connection: %s",
str(ex))
+
+ #If we are sending a fragmented frame with a generator this will make
that stop
+ self.server_terminate_request = True
+
+ with self.lock:
+ if not self.server_terminated:
+ self.server_terminated = True
+ try:
+ self._write(self.stream.close(code=code,
reason=reason).single(mask=self.stream.always_mask))
+ except Exception as ex:
+ logger.error("Error when terminating the connection: %s",
str(ex))
def closed(self, code, reason=None):
"""
@@ -300,21 +310,29 @@ class WebSocket(object):
if isinstance(payload, basestring) or isinstance(payload, bytearray):
m = message_sender(payload).single(mask=self.stream.always_mask)
- self._write(m)
+ with self.lock:
+ self._write(m)
elif isinstance(payload, Message):
data = payload.single(mask=self.stream.always_mask)
- self._write(data)
+ with self.lock:
+ self._write(data)
elif type(payload) == types.GeneratorType:
- bytes = next(payload)
- first = True
- for chunk in payload:
- self._write(message_sender(bytes).fragment(first=first,
mask=self.stream.always_mask))
- bytes = chunk
- first = False
-
- self._write(message_sender(bytes).fragment(first=first, last=True,
mask=self.stream.always_mask))
+ with self.lock:
+ bytes = next(payload)
+ first = True
+ for chunk in payload:
+
+ #This lets close() interrupt even a long running send.
+ if self.server_terminate_request:
+ break
+
+ self._write(message_sender(bytes).fragment(first=first,
mask=self.stream.always_mask))
+ bytes = chunk
+ first = False
+
+ self._write(message_sender(bytes).fragment(first=first,
last=True, mask=self.stream.always_mask))
else:
raise ValueError("Unsupported type '%s' passed to send()" %
type(payload))
--
To stop receiving notification emails like this one, please contact
[email protected].