multiple responses
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6efe896 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6efe896 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6efe896 Branch: refs/heads/ignite-7777 Commit: d6efe8966c6ddcb85f3ceb0b9f299dc476bc2697 Parents: 4159da8 Author: ekaterina-nbl <[email protected]> Authored: Fri May 11 01:33:28 2018 +0300 Committer: ekaterina-nbl <[email protected]> Committed: Fri May 11 01:33:28 2018 +0300 ---------------------------------------------------------------------- .../nodejs/lib/internal/ClientSocket.js | 58 +++++++++++--------- .../nodejs/lib/internal/MessageBuffer.js | 3 +- 2 files changed, 33 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d6efe896/modules/platforms/nodejs/lib/internal/ClientSocket.js ---------------------------------------------------------------------- diff --git a/modules/platforms/nodejs/lib/internal/ClientSocket.js b/modules/platforms/nodejs/lib/internal/ClientSocket.js index 0e4e713..e32ef21 100644 --- a/modules/platforms/nodejs/lib/internal/ClientSocket.js +++ b/modules/platforms/nodejs/lib/internal/ClientSocket.js @@ -198,38 +198,42 @@ class ClientSocket { if (this._state === STATE.DISCONNECTED) { return; } - const buffer = MessageBuffer.from(message); - // Response length - const length = buffer.readInteger(); - let requestId, isSuccess; - const isHandshake = this._state === STATE.HANDSHAKE; - - if (isHandshake) { - // Handshake status - isSuccess = (buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE) - requestId = this._handshakeRequestId.toString(); - } - else { - // Request id - requestId = buffer.readLong().toString(); - // Status code - isSuccess = (buffer.readInteger() === REQUEST_SUCCESS_STATUS_CODE); - } + let offset = 0; + while (offset < message.length) { + let buffer = MessageBuffer.from(message, offset); + // Response length + const length = buffer.readInteger(); + offset += length + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER); + let requestId, isSuccess; + const isHandshake = this._state === STATE.HANDSHAKE; - this._logMessage(requestId, false, buffer.data); - - if (this._requests.has(requestId)) { - const request = this._requests.get(requestId); - this._requests.delete(requestId); if (isHandshake) { - await this._finalizeHandshake(buffer, request, isSuccess); + // Handshake status + isSuccess = (buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE) + requestId = this._handshakeRequestId.toString(); } else { - await this._finalizeResponse(buffer, request, isSuccess); + // Request id + requestId = buffer.readLong().toString(); + // Status code + isSuccess = (buffer.readInteger() === REQUEST_SUCCESS_STATUS_CODE); + } + + this._logMessage(requestId, false, buffer.data); + + if (this._requests.has(requestId)) { + const request = this._requests.get(requestId); + this._requests.delete(requestId); + if (isHandshake) { + await this._finalizeHandshake(buffer, request, isSuccess); + } + else { + await this._finalizeResponse(buffer, request, isSuccess); + } + } + else { + throw Errors.IgniteClientError.internalError('Invalid response id: ' + requestId); } - } - else { - throw Errors.IgniteClientError.internalError('Invalid response id: ' + requestId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6efe896/modules/platforms/nodejs/lib/internal/MessageBuffer.js ---------------------------------------------------------------------- diff --git a/modules/platforms/nodejs/lib/internal/MessageBuffer.js b/modules/platforms/nodejs/lib/internal/MessageBuffer.js index b5f94f8..bace45f 100644 --- a/modules/platforms/nodejs/lib/internal/MessageBuffer.js +++ b/modules/platforms/nodejs/lib/internal/MessageBuffer.js @@ -33,9 +33,10 @@ class MessageBuffer { this._position = 0; } - static from(source) { + static from(source, position) { const buf = new MessageBuffer(); buf._buffer = Buffer.from(source); + buf._position = position; buf._length = buf._buffer.length; buf._capacity = buf._length; return buf;
