This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d948ff0  AMBARI-23446. Fix connection drop on ambari-agent by locking 
the write code (aonishuk)
d948ff0 is described below

commit d948ff082cced05695a2d8dbed2c15c5e19556de
Author: Andrew Onishuk <aonis...@hortonworks.com>
AuthorDate: Wed Apr 4 12:44:01 2018 +0300

    AMBARI-23446. Fix connection drop on ambari-agent by locking the write code 
(aonishuk)
---
 .../src/main/python/ambari_ws4py/websocket.py      | 50 +++++++++++++++-------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py 
b/ambari-common/src/main/python/ambari_ws4py/websocket.py
index dbaa2b6..936f333 100644
--- a/ambari-common/src/main/python/ambari_ws4py/websocket.py
+++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py
@@ -144,6 +144,11 @@ class WebSocket(object):
         self._local_address = None
         self._peer_address = None
 
+        self.lock = threading.Lock()
+
+        "Used to signal that the server side should be shutting down"
+        self.server_terminate_request = False
+
     @property
     def local_address(self):
         """
@@ -187,12 +192,17 @@ class WebSocket(object):
 
         .. seealso:: Defined Status Codes 
http://tools.ietf.org/html/rfc6455#section-7.4.1
         """
-        if not self.server_terminated:
-            self.server_terminated = True
-            try:
-                self._write(self.stream.close(code=code, 
reason=reason).single(mask=self.stream.always_mask))
-            except Exception as ex:
-                logger.error("Error when terminating the connection: %s", 
str(ex))
+
+        #If we are sending a fragmented frame with a generator this will make 
that stop
+        self.server_terminate_request = True
+
+        with self.lock:
+          if not self.server_terminated:
+              self.server_terminated = True
+              try:
+                  self._write(self.stream.close(code=code, 
reason=reason).single(mask=self.stream.always_mask))
+              except Exception as ex:
+                  logger.error("Error when terminating the connection: %s", 
str(ex))
 
     def closed(self, code, reason=None):
         """
@@ -300,21 +310,29 @@ class WebSocket(object):
 
         if isinstance(payload, basestring) or isinstance(payload, bytearray):
             m = message_sender(payload).single(mask=self.stream.always_mask)
-            self._write(m)
+            with self.lock:
+                self._write(m)
 
         elif isinstance(payload, Message):
             data = payload.single(mask=self.stream.always_mask)
-            self._write(data)
+            with self.lock:
+                self._write(data)
 
         elif type(payload) == types.GeneratorType:
-            bytes = next(payload)
-            first = True
-            for chunk in payload:
-                self._write(message_sender(bytes).fragment(first=first, 
mask=self.stream.always_mask))
-                bytes = chunk
-                first = False
-
-            self._write(message_sender(bytes).fragment(first=first, last=True, 
mask=self.stream.always_mask))
+            with self.lock:
+                bytes = next(payload)
+                first = True
+                for chunk in payload:
+
+                    #This lets close() interrupt even a long running send.
+                    if self.server_terminate_request:
+                        break
+
+                    self._write(message_sender(bytes).fragment(first=first, 
mask=self.stream.always_mask))
+                    bytes = chunk
+                    first = False
+
+                self._write(message_sender(bytes).fragment(first=first, 
last=True, mask=self.stream.always_mask))
 
         else:
             raise ValueError("Unsupported type '%s' passed to send()" % 
type(payload))

-- 
To stop receiving notification emails like this one, please contact
aonis...@apache.org.

Reply via email to