Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-106-Create-sqla-logging-handler 853d3dc7e -> 4e3fb0074


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/eba3e875
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/eba3e875
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/eba3e875

Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: eba3e875993f4aca69956f86271de01f1e83de23
Parents: 853d3dc
Author: mxmrlv <[email protected]>
Authored: Mon Feb 20 17:29:47 2017 +0200
Committer: mxmrlv <[email protected]>
Committed: Mon Feb 20 17:30:15 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 21 +++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/eba3e875/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 75bbbce..5f8a813 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -251,23 +251,34 @@ class ProcessExecutor(base.BaseExecutor):
 
 
 def _send_message(connection, message):
+
+    # Packing the length of the entire msg using struct.pack. This enables 
later reading of the content.
+    def _pack(data):
+        return struct.pack(_INT_FMT, len(data))
+
     data = jsonpickle.dumps(message)
-    connection.send(struct.pack(_INT_FMT, len(data)))
+    msg_metadata = _pack(data)
+    connection.send(msg_metadata)
     connection.sendall(data)
 
 
 def _recv_message(connection):
-    message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE))
-    return jsonpickle.loads(_recv_bytes(connection, message_len))
+    # Retrieving the length of the msg to come.
+    def _unpack(conn):
+        return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE, 
wait=True))[0]
+
+    msg_metadata_len = _unpack(connection)
+    msg = _recv_bytes(connection, msg_metadata_len)
+    return jsonpickle.loads(msg)
 
 
-def _recv_bytes(connection, count):
+def _recv_bytes(connection, count, wait=False):
     result = io.BytesIO()
     while True:
         if not count:
             return result.getvalue()
         read = connection.recv(count)
-        if not read:
+        if not wait and not read:
             return result.getvalue()
         result.write(read)
         count -= len(read)

Reply via email to