Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-106-Create-sqla-logging-handler 853d3dc7e -> 4e3fb0074
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/eba3e875 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/eba3e875 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/eba3e875 Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: eba3e875993f4aca69956f86271de01f1e83de23 Parents: 853d3dc Author: mxmrlv <[email protected]> Authored: Mon Feb 20 17:29:47 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Mon Feb 20 17:30:15 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 21 +++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/eba3e875/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 75bbbce..5f8a813 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -251,23 +251,34 @@ class ProcessExecutor(base.BaseExecutor): def _send_message(connection, message): + + # Packing the length of the entire msg using struct.pack. This enables later reading of the content. + def _pack(data): + return struct.pack(_INT_FMT, len(data)) + data = jsonpickle.dumps(message) - connection.send(struct.pack(_INT_FMT, len(data))) + msg_metadata = _pack(data) + connection.send(msg_metadata) connection.sendall(data) def _recv_message(connection): - message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE)) - return jsonpickle.loads(_recv_bytes(connection, message_len)) + # Retrieving the length of the msg to come. + def _unpack(conn): + return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE, wait=True))[0] + + msg_metadata_len = _unpack(connection) + msg = _recv_bytes(connection, msg_metadata_len) + return jsonpickle.loads(msg) -def _recv_bytes(connection, count): +def _recv_bytes(connection, count, wait=False): result = io.BytesIO() while True: if not count: return result.getvalue() read = connection.recv(count) - if not read: + if not wait and not read: return result.getvalue() result.write(read) count -= len(read)
