This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-nodejs-thin-client.git
commit e9cf4afbef75175ab7a86a1fb3e4b46fcdfd73df Author: Toru Yabuki <[email protected]> AuthorDate: Tue Sep 4 14:52:03 2018 +0900 IGNITE-9382: Fix Node.js client o process large payloads. - Fixes #4629. Signed-off-by: shroman <[email protected]> --- lib/internal/ClientSocket.js | 35 ++++++++++++++++++++++++++--------- lib/internal/MessageBuffer.js | 6 ++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/lib/internal/ClientSocket.js b/lib/internal/ClientSocket.js index 1d1a4dd..73e11f0 100644 --- a/lib/internal/ClientSocket.js +++ b/lib/internal/ClientSocket.js @@ -110,6 +110,8 @@ class ClientSocket { this._onSocketDisconnect = onSocketDisconnect; this._error = null; this._wasConnected = false; + this._buffer = null; + this._offset = 0; } async connect() { @@ -198,28 +200,43 @@ class ClientSocket { if (this._state === STATE.DISCONNECTED) { return; } - let offset = 0; - while (offset < message.length) { - let buffer = MessageBuffer.from(message, offset); + if (this._buffer) { + this._buffer.concat(message); + this._buffer.position = this._offset; + } + else { + this._buffer = MessageBuffer.from(message, 0); + } + while (this._buffer && this._offset < this._buffer.length) { // Response length - const length = buffer.readInteger(); - offset += length + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER); + const length = this._buffer.readInteger() + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER); + if (this._buffer.length < this._offset + length) { + break; + } + this._offset += length; + let requestId, isSuccess; const isHandshake = this._state === STATE.HANDSHAKE; if (isHandshake) { // Handshake status - isSuccess = (buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE) + isSuccess = (this._buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE); requestId = this._handshakeRequestId.toString(); } else { // Request id - requestId = buffer.readLong().toString(); + requestId = this._buffer.readLong().toString(); // Status code - isSuccess = (buffer.readInteger() === REQUEST_SUCCESS_STATUS_CODE); + isSuccess = (this._buffer.readInteger() === REQUEST_SUCCESS_STATUS_CODE); } - this._logMessage(requestId, false, buffer.data); + this._logMessage(requestId, false, this._buffer.data); + + const buffer = this._buffer; + if (this._offset === this._buffer.length) { + this._buffer = null; + this._offset = 0; + } if (this._requests.has(requestId)) { const request = this._requests.get(requestId); diff --git a/lib/internal/MessageBuffer.js b/lib/internal/MessageBuffer.js index f1407bf..dc3e928 100644 --- a/lib/internal/MessageBuffer.js +++ b/lib/internal/MessageBuffer.js @@ -42,6 +42,12 @@ class MessageBuffer { return buf; } + concat(source) { + this._buffer = Buffer.concat([this._buffer, source]); + this._length = this._buffer.length; + this._capacity = this._length; + } + get position() { return this._position; }
