This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-nodejs-thin-client.git
commit b85c1a10a65300e296bd87d0a7a24fedd9668216 Author: ekaterina-nbl <[email protected]> AuthorDate: Thu Aug 16 17:44:04 2018 +0300 IGNITE-9258: Node.js now can handle more than one client in the same app This closes #4554 --- lib/BinaryObject.js | 39 ++-- lib/CacheClient.js | 48 +++-- lib/CacheConfiguration.js | 185 ++++++++--------- lib/Cursor.js | 22 +- lib/EnumItem.js | 13 +- lib/IgniteClient.js | 26 ++- lib/Query.js | 38 ++-- lib/internal/BinaryCommunicator.js | 409 +++++++++++++++++++++++++++++++++++++ lib/internal/BinaryReader.js | 197 ------------------ lib/internal/BinaryType.js | 33 ++- lib/internal/BinaryTypeStorage.js | 43 ++-- lib/internal/BinaryWriter.js | 210 ------------------- lib/internal/ClientSocket.js | 11 +- spec/examples/AuthExample.spec.js | 5 + spec/examples/Examples.spec.js | 5 + 15 files changed, 642 insertions(+), 642 deletions(-) diff --git a/lib/BinaryObject.js b/lib/BinaryObject.js index 2cc6be6..fb139da 100644 --- a/lib/BinaryObject.js +++ b/lib/BinaryObject.js @@ -25,7 +25,6 @@ const BinaryUtils = require('./internal/BinaryUtils'); const BinaryType = require('./internal/BinaryType'); const BinaryField = require('./internal/BinaryType').BinaryField; const BinaryTypeBuilder = require('./internal/BinaryType').BinaryTypeBuilder; -const BinaryWriter = require('./internal/BinaryWriter'); const ArgumentChecker = require('./internal/ArgumentChecker'); const Logger = require('./internal/Logger'); @@ -283,28 +282,28 @@ class BinaryObject { /** * @ignore */ - static async _fromBuffer(buffer) { + static async _fromBuffer(communicator, buffer) { const result = new BinaryObject(new ComplexObjectType({})._typeName); result._buffer = buffer; result._startPos = buffer.position; - await result._read(); + await result._read(communicator); return result; } /** * @ignore */ - async _write(buffer) { + async _write(communicator, buffer) { if (this._buffer && !this._modified) { buffer.writeBuffer(this._buffer.buffer, this._startPos, this._startPos + this._length); } else { - await this._typeBuilder.finalize(); + await this._typeBuilder.finalize(communicator); this._startPos = buffer.position; buffer.position = this._startPos + HEADER_LENGTH; // write fields for (let field of this._fields.values()) { - await field._writeValue(buffer, this._typeBuilder.getField(field.id).typeCode); + await field._writeValue(communicator, buffer, this._typeBuilder.getField(field.id).typeCode); } this._schemaOffset = buffer.position - this._startPos; // write schema @@ -351,8 +350,8 @@ class BinaryObject { /** * @ignore */ - async _read() { - await this._readHeader(); + async _read(communicator) { + await this._readHeader(communicator); this._buffer.position = this._startPos + this._schemaOffset; const fieldOffsets = new Array(); const fieldIds = this._typeBuilder._schema.fieldIds; @@ -382,7 +381,7 @@ class BinaryObject { offset = fieldOffsets[i][1]; nextOffset = i + 1 < fieldOffsets.length ? fieldOffsets[i + 1][1] : this._schemaOffset; field = BinaryObjectField._fromBuffer( - this._buffer, this._startPos + offset, nextOffset - offset, fieldId); + communicator,this._buffer, this._startPos + offset, nextOffset - offset, fieldId); this._fields.set(field.id, field); } this._buffer.position = this._startPos + this._length; @@ -391,7 +390,7 @@ class BinaryObject { /** * @ignore */ - async _readHeader() { + async _readHeader(communicator) { // type code this._buffer.readByte(); // version @@ -419,7 +418,7 @@ class BinaryObject { BinaryUtils.TYPE_CODE.SHORT : BinaryUtils.TYPE_CODE.INTEGER; - if (BinaryObject._isFlagSet(FLAG_HAS_RAW_DATA)) { + if (BinaryObject._isFlagSet(flags, FLAG_HAS_RAW_DATA)) { throw Errors.IgniteClientError.serializationError( false, 'complex objects with raw data are not supported'); } @@ -427,7 +426,7 @@ class BinaryObject { throw Errors.IgniteClientError.serializationError( false, 'schema is absent for object with compact footer'); } - this._typeBuilder = await BinaryTypeBuilder.fromTypeId(typeId, schemaId, hasSchema); + this._typeBuilder = await BinaryTypeBuilder.fromTypeId(communicator, typeId, schemaId, hasSchema); } } @@ -460,31 +459,35 @@ class BinaryObjectField { async getValue(type = null) { if (this._value === undefined || this._buffer && this._type !== type) { this._buffer.position = this._offset; - const BinaryReader = require('./internal/BinaryReader'); - this._value = await BinaryReader.readObject(this._buffer, type); + this._value = await this._communicator.readObject(this._buffer, type); this._type = type; } return this._value; } - static _fromBuffer(buffer, offset, length, id) { + static _fromBuffer(communicator, buffer, offset, length, id) { const result = new BinaryObjectField(null); result._id = id; + result._communicator = communicator; result._buffer = buffer; result._offset = offset; result._length = length; return result; } - async _writeValue(buffer, expectedTypeCode) { + async _writeValue(communicator, buffer, expectedTypeCode) { const offset = buffer.position; - if (this._buffer) { + if (this._buffer && this._communicator === communicator) { buffer.writeBuffer(this._buffer.buffer, this._offset, this._offset + this._length); } else { + if (this._value === undefined) { + await this.getValue(); + } BinaryUtils.checkCompatibility(this._value, expectedTypeCode); - await BinaryWriter.writeObject(buffer, this._value, this._type); + await communicator.writeObject(buffer, this._value, this._type); } + this._communicator = communicator; this._buffer = buffer; this._length = buffer.position - offset; this._offset = offset; diff --git a/lib/CacheClient.js b/lib/CacheClient.js index b76471f..f59910b 100644 --- a/lib/CacheClient.js +++ b/lib/CacheClient.js @@ -18,8 +18,6 @@ 'use strict'; const BinaryUtils = require('./internal/BinaryUtils'); -const BinaryReader = require('./internal/BinaryReader'); -const BinaryWriter = require('./internal/BinaryWriter'); const ArgumentChecker = require('./internal/ArgumentChecker'); const SqlQuery = require('./Query').SqlQuery; const SqlFieldsQuery = require('./Query').SqlFieldsQuery; @@ -156,7 +154,7 @@ class CacheClient { ArgumentChecker.notEmpty(keys, 'keys'); ArgumentChecker.hasType(keys, 'keys', false, Array); let result = null; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_GET_ALL, async (payload) => { this._writeCacheInfo(payload); @@ -167,8 +165,8 @@ class CacheClient { result = new Array(resultCount); for (let i = 0; i < resultCount; i++) { result[i] = new CacheEntry( - await BinaryReader.readObject(payload, this._getKeyType()), - await BinaryReader.readObject(payload, this._getValueType())); + await this._communicator.readObject(payload, this._getKeyType()), + await this._communicator.readObject(payload, this._getValueType())); } }); return result; @@ -206,7 +204,7 @@ class CacheClient { async putAll(entries) { ArgumentChecker.notEmpty(entries, 'entries'); ArgumentChecker.hasType(entries, 'entries', true, CacheEntry); - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_PUT_ALL, async (payload) => { this._writeCacheInfo(payload); @@ -374,12 +372,12 @@ class CacheClient { ArgumentChecker.notNull(value, 'value'); ArgumentChecker.notNull(newValue, 'newValue'); let result; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_REPLACE_IF_EQUALS, async (payload) => { this._writeCacheInfo(payload); await this._writeKeyValue(payload, key, value); - await BinaryWriter.writeObject(payload, newValue, this._getValueType()); + await this._communicator.writeObject(payload, newValue, this._getValueType()); }, async (payload) => { result = payload.readBoolean(); @@ -395,7 +393,7 @@ class CacheClient { * @throws {IgniteClientError} if error. */ async clear() { - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_CLEAR, async (payload) => { this._writeCacheInfo(payload); @@ -481,7 +479,7 @@ class CacheClient { * @throws {IgniteClientError} if error. */ async removeAll() { - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_REMOVE_ALL, async (payload) => { this._writeCacheInfo(payload); @@ -502,7 +500,7 @@ class CacheClient { async getSize(...peekModes) { ArgumentChecker.hasValueFrom(peekModes, 'peekModes', true, CacheClient.PEEK_MODE); let result; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_GET_SIZE, async (payload) => { this._writeCacheInfo(payload); @@ -537,14 +535,14 @@ class CacheClient { ArgumentChecker.hasType(query, 'query', false, SqlQuery, SqlFieldsQuery, ScanQuery); let value = null; - await this._socket.send( + await this._communicator.send( query._operation, async (payload) => { this._writeCacheInfo(payload); - await query._write(payload); + await query._write(this._communicator, payload); }, async (payload) => { - value = await query._getCursor(this._socket, payload, this._keyType, this._valueType); + value = await query._getCursor(this._communicator, payload, this._keyType, this._valueType); }); return value; } @@ -554,13 +552,13 @@ class CacheClient { /** * @ignore */ - constructor(name, config, socket) { + constructor(name, config, communicator) { this._name = name; this._cacheId = CacheClient._calculateId(this._name); this._config = config; this._keyType = null; this._valueType = null; - this._socket = socket; + this._communicator = communicator; } /** @@ -582,8 +580,8 @@ class CacheClient { * @ignore */ async _writeKeyValue(payload, key, value) { - await BinaryWriter.writeObject(payload, key, this._getKeyType()); - await BinaryWriter.writeObject(payload, value, this._getValueType()); + await this._communicator.writeObject(payload, key, this._getKeyType()); + await this._communicator.writeObject(payload, value, this._getValueType()); } /** @@ -592,7 +590,7 @@ class CacheClient { async _writeKeys(payload, keys) { payload.writeInteger(keys.length); for (let key of keys) { - await BinaryWriter.writeObject(payload, key, this._getKeyType()); + await this._communicator.writeObject(payload, key, this._getKeyType()); } } @@ -616,7 +614,7 @@ class CacheClient { async _writeKeyValueOp(operation, key, value, payloadReader = null) { ArgumentChecker.notNull(key, 'key'); ArgumentChecker.notNull(value, 'value'); - await this._socket.send( + await this._communicator.send( operation, async (payload) => { this._writeCacheInfo(payload); @@ -633,7 +631,7 @@ class CacheClient { await this._writeKeyValueOp( operation, key, value, async (payload) => { - result = await BinaryReader.readObject(payload, this._getValueType()); + result = await this._communicator.readObject(payload, this._getValueType()); }); return result; } @@ -656,11 +654,11 @@ class CacheClient { */ async _writeKeyOp(operation, key, payloadReader = null) { ArgumentChecker.notNull(key, 'key'); - await this._socket.send( + await this._communicator.send( operation, async (payload) => { this._writeCacheInfo(payload); - await BinaryWriter.writeObject(payload, key, this._getKeyType()); + await this._communicator.writeObject(payload, key, this._getKeyType()); }, payloadReader); } @@ -673,7 +671,7 @@ class CacheClient { await this._writeKeyOp( operation, key, async (payload) => { - value = await BinaryReader.readObject(payload, this._getValueType()); + value = await this._communicator.readObject(payload, this._getValueType()); }); return value; } @@ -697,7 +695,7 @@ class CacheClient { async _writeKeysOp(operation, keys, payloadReader = null) { ArgumentChecker.notEmpty(keys, 'keys'); ArgumentChecker.hasType(keys, 'keys', false, Array); - await this._socket.send( + await this._communicator.send( operation, async (payload) => { this._writeCacheInfo(payload); diff --git a/lib/CacheConfiguration.js b/lib/CacheConfiguration.js index a4e4574..ccf20b9 100644 --- a/lib/CacheConfiguration.js +++ b/lib/CacheConfiguration.js @@ -20,8 +20,7 @@ const ComplexObjectType = require('./ObjectType').ComplexObjectType; const ObjectArrayType = require('./ObjectType').ObjectArrayType; const BinaryUtils = require('./internal/BinaryUtils'); -const BinaryReader = require('./internal/BinaryReader'); -const BinaryWriter = require('./internal/BinaryWriter'); +const BinaryCommunicator = require('./internal/BinaryCommunicator'); const ArgumentChecker = require('./internal/ArgumentChecker'); const Errors = require('./Errors'); @@ -94,17 +93,17 @@ class CacheKeyConfiguration { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._typeName); - await BinaryWriter.writeString(buffer, this._affinityKeyFieldName); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._typeName); + BinaryCommunicator.writeString(buffer, this._affinityKeyFieldName); } /** * @ignore */ - async _read(buffer) { - this._typeName = await BinaryReader.readObject(buffer); - this._affinityKeyFieldName = await BinaryReader.readObject(buffer); + async _read(communicator, buffer) { + this._typeName = BinaryCommunicator.readString(buffer); + this._affinityKeyFieldName = BinaryCommunicator.readString(buffer); } } @@ -306,27 +305,27 @@ class QueryEntity { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._keyTypeName); - await BinaryWriter.writeString(buffer, this._valueTypeName); - await BinaryWriter.writeString(buffer, this._tableName); - await BinaryWriter.writeString(buffer, this._keyFieldName); - await BinaryWriter.writeString(buffer, this._valueFieldName); - await this._writeSubEntities(buffer, this._fields); - await this._writeAliases(buffer); - await this._writeSubEntities(buffer, this._indexes); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._keyTypeName); + BinaryCommunicator.writeString(buffer, this._valueTypeName); + BinaryCommunicator.writeString(buffer, this._tableName); + BinaryCommunicator.writeString(buffer, this._keyFieldName); + BinaryCommunicator.writeString(buffer, this._valueFieldName); + await this._writeSubEntities(communicator, buffer, this._fields); + await this._writeAliases(communicator, buffer); + await this._writeSubEntities(communicator, buffer, this._indexes); } /** * @ignore */ - async _writeAliases(buffer) { + async _writeAliases(communicator, buffer) { const length = this._aliases ? this._aliases.size : 0; buffer.writeInteger(length); if (length > 0) { for (let [key, value] of this._aliases.entries()) { - await BinaryWriter.writeString(buffer, key); - await BinaryWriter.writeString(buffer, value); + BinaryCommunicator.writeString(buffer, key); + BinaryCommunicator.writeString(buffer, value); } } } @@ -334,12 +333,12 @@ class QueryEntity { /** * @ignore */ - async _writeSubEntities(buffer, entities) { + async _writeSubEntities(communicator, buffer, entities) { const length = entities ? entities.length : 0; buffer.writeInteger(length); if (length > 0) { for (let entity of entities) { - await entity._write(buffer); + await entity._write(communicator, buffer); } } } @@ -347,28 +346,28 @@ class QueryEntity { /** * @ignore */ - async _read(buffer) { - this._keyTypeName = await BinaryReader.readObject(buffer); - this._valueTypeName = await BinaryReader.readObject(buffer); - this._tableName = await BinaryReader.readObject(buffer); - this._keyFieldName = await BinaryReader.readObject(buffer); - this._valueFieldName = await BinaryReader.readObject(buffer); - this._fields = await this._readSubEntities(buffer, QueryField); - await this._readAliases(buffer); - this._indexes = await this._readSubEntities(buffer, QueryIndex); + async _read(communicator, buffer) { + this._keyTypeName = await communicator.readObject(buffer); + this._valueTypeName = await communicator.readObject(buffer); + this._tableName = await communicator.readObject(buffer); + this._keyFieldName = await communicator.readObject(buffer); + this._valueFieldName = await communicator.readObject(buffer); + this._fields = await this._readSubEntities(communicator, buffer, QueryField); + await this._readAliases(communicator, buffer); + this._indexes = await this._readSubEntities(communicator, buffer, QueryIndex); } /** * @ignore */ - async _readSubEntities(buffer, objectConstructor) { + async _readSubEntities(communicator, buffer, objectConstructor) { const length = buffer.readInteger(buffer); const result = new Array(length); if (length > 0) { let res; for (let i = 0; i < length; i++) { res = new objectConstructor(); - await res._read(buffer); + await res._read(communicator, buffer); result[i] = res; } } @@ -378,13 +377,13 @@ class QueryEntity { /** * @ignore */ - async _readAliases(buffer) { + async _readAliases(communicator, buffer) { const length = buffer.readInteger(buffer); this._aliases = new Map(); if (length > 0) { let res; for (let i = 0; i < length; i++) { - this._aliases.set(await BinaryReader.readObject(buffer), await BinaryReader.readObject(buffer)); + this._aliases.set(await communicator.readObject(buffer), await communicator.readObject(buffer)); } } } @@ -416,6 +415,7 @@ class QueryField { this._precision = -1; this._scale = -1; this._valueType = null; + this._communicator = null; this._buffer = null; this._index = null; } @@ -538,7 +538,7 @@ class QueryField { if (this._buffer) { const position = this._buffer.position; this._buffer.position = this._index; - const result = await BinaryReader.readObject(this._buffer, valueType); + const result = await this._communicator.readObject(this._buffer, valueType); this._buffer.position = position; return result; } @@ -600,12 +600,12 @@ class QueryField { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._name); - await BinaryWriter.writeString(buffer, this._typeName); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._name); + BinaryCommunicator.writeString(buffer, this._typeName); buffer.writeBoolean(this._isKeyField); buffer.writeBoolean(this._isNotNull); - await BinaryWriter.writeObject(buffer, this._defaultValue ? this._defaultValue : null, this._valueType); + await communicator.writeObject(buffer, this._defaultValue ? this._defaultValue : null, this._valueType); buffer.writeInteger(this._precision); buffer.writeInteger(this._scale); } @@ -613,15 +613,16 @@ class QueryField { /** * @ignore */ - async _read(buffer) { - this._name = await BinaryReader.readObject(buffer); - this._typeName = await BinaryReader.readObject(buffer); + async _read(communicator, buffer) { + this._name = await communicator.readObject(buffer); + this._typeName = await communicator.readObject(buffer); this._isKeyField = buffer.readBoolean(); this._isNotNull = buffer.readBoolean(); this._defaultValue = undefined; + this._communicator = communicator; this._buffer = buffer; this._index = buffer.position; - await BinaryReader.readObject(buffer); + await communicator.readObject(buffer); this._precision = buffer.readInteger(); this._scale = buffer.readInteger(); } @@ -732,7 +733,7 @@ class QueryIndex { * * @return {number} */ - getInlineSize() { + getInlineSize() { return this._inlineSize; } @@ -762,8 +763,8 @@ class QueryIndex { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._name); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._name); buffer.writeByte(this._type); buffer.writeInteger(this._inlineSize); // write fields @@ -771,7 +772,7 @@ class QueryIndex { buffer.writeInteger(length); if (length > 0) { for (let [key, value] of this._fields.entries()) { - await BinaryWriter.writeString(buffer, key); + BinaryCommunicator.writeString(buffer, key); buffer.writeBoolean(value); } } @@ -780,8 +781,8 @@ class QueryIndex { /** * @ignore */ - async _read(buffer) { - this._name = await BinaryReader.readObject(buffer); + async _read(communicator, buffer) { + this._name = await communicator.readObject(buffer); this._type = buffer.readByte(); this._inlineSize = buffer.readInteger(); // read fields @@ -790,7 +791,7 @@ class QueryIndex { if (length > 0) { let res; for (let i = 0; i < length; i++) { - this._fields.set(await BinaryReader.readObject(buffer), buffer.readBoolean()); + this._fields.set(await communicator.readObject(buffer), buffer.readBoolean()); } } } @@ -1610,7 +1611,7 @@ class CacheConfiguration { /** * @ignore */ - async _write(buffer, name) { + async _write(communicator, buffer, name) { this._properties.set(PROP_NAME, name); const startPos = buffer.position; @@ -1619,7 +1620,7 @@ class CacheConfiguration { BinaryUtils.getSize(BinaryUtils.TYPE_CODE.SHORT); for (let [propertyCode, property] of this._properties) { - await this._writeProperty(buffer, propertyCode, property); + await this._writeProperty(communicator, buffer, propertyCode, property); } const length = buffer.position - startPos; @@ -1632,23 +1633,23 @@ class CacheConfiguration { /** * @ignore */ - async _writeProperty(buffer, propertyCode, property) { + async _writeProperty(communicator, buffer, propertyCode, property) { buffer.writeShort(propertyCode); const propertyType = PROP_TYPES[propertyCode]; switch (BinaryUtils.getTypeCode(propertyType)) { case BinaryUtils.TYPE_CODE.INTEGER: case BinaryUtils.TYPE_CODE.LONG: case BinaryUtils.TYPE_CODE.BOOLEAN: - await BinaryWriter.writeObject(buffer, property, propertyType, false); + await communicator.writeObject(buffer, property, propertyType, false); return; case BinaryUtils.TYPE_CODE.STRING: - await BinaryWriter.writeObject(buffer, property, propertyType); + await communicator.writeObject(buffer, property, propertyType); return; case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: const length = property ? property.length : 0; buffer.writeInteger(length); for (let prop of property) { - await prop._write(buffer); + await prop._write(communicator, buffer); } return; default: @@ -1659,54 +1660,54 @@ class CacheConfiguration { /** * @ignore */ - async _read(buffer) { + async _read(communicator, buffer) { // length buffer.readInteger(); - await this._readProperty(buffer, PROP_ATOMICITY_MODE); - await this._readProperty(buffer, PROP_BACKUPS); - await this._readProperty(buffer, PROP_CACHE_MODE); - await this._readProperty(buffer, PROP_COPY_ON_READ); - await this._readProperty(buffer, PROP_DATA_REGION_NAME); - await this._readProperty(buffer, PROP_EAGER_TTL); - await this._readProperty(buffer, PROP_STATISTICS_ENABLED); - await this._readProperty(buffer, PROP_GROUP_NAME); - await this._readProperty(buffer, PROP_DEFAULT_LOCK_TIMEOUT); - await this._readProperty(buffer, PROP_MAX_CONCURRENT_ASYNC_OPS); - await this._readProperty(buffer, PROP_MAX_QUERY_ITERATORS); - await this._readProperty(buffer, PROP_NAME); - await this._readProperty(buffer, PROP_IS_ONHEAP_CACHE_ENABLED); - await this._readProperty(buffer, PROP_PARTITION_LOSS_POLICY); - await this._readProperty(buffer, PROP_QUERY_DETAIL_METRICS_SIZE); - await this._readProperty(buffer, PROP_QUERY_PARALLELISM); - await this._readProperty(buffer, PROP_READ_FROM_BACKUP); - await this._readProperty(buffer, PROP_REBALANCE_BATCH_SIZE); - await this._readProperty(buffer, PROP_REBALANCE_BATCHES_PREFETCH_COUNT); - await this._readProperty(buffer, PROP_REBALANCE_DELAY); - await this._readProperty(buffer, PROP_REBALANCE_MODE); - await this._readProperty(buffer, PROP_REBALANCE_ORDER); - await this._readProperty(buffer, PROP_REBALANCE_THROTTLE); - await this._readProperty(buffer, PROP_REBALANCE_TIMEOUT); - await this._readProperty(buffer, PROP_SQL_ESCAPE_ALL); - await this._readProperty(buffer, PROP_SQL_INDEX_INLINE_MAX_SIZE); - await this._readProperty(buffer, PROP_SQL_SCHEMA); - await this._readProperty(buffer, PROP_WRITE_SYNCHRONIZATION_MODE); - await this._readProperty(buffer, PROP_CACHE_KEY_CONFIGURATION); - await this._readProperty(buffer, PROP_QUERY_ENTITY); + await this._readProperty(communicator, buffer, PROP_ATOMICITY_MODE); + await this._readProperty(communicator, buffer, PROP_BACKUPS); + await this._readProperty(communicator, buffer, PROP_CACHE_MODE); + await this._readProperty(communicator, buffer, PROP_COPY_ON_READ); + await this._readProperty(communicator, buffer, PROP_DATA_REGION_NAME); + await this._readProperty(communicator, buffer, PROP_EAGER_TTL); + await this._readProperty(communicator, buffer, PROP_STATISTICS_ENABLED); + await this._readProperty(communicator, buffer, PROP_GROUP_NAME); + await this._readProperty(communicator, buffer, PROP_DEFAULT_LOCK_TIMEOUT); + await this._readProperty(communicator, buffer, PROP_MAX_CONCURRENT_ASYNC_OPS); + await this._readProperty(communicator, buffer, PROP_MAX_QUERY_ITERATORS); + await this._readProperty(communicator, buffer, PROP_NAME); + await this._readProperty(communicator, buffer, PROP_IS_ONHEAP_CACHE_ENABLED); + await this._readProperty(communicator, buffer, PROP_PARTITION_LOSS_POLICY); + await this._readProperty(communicator, buffer, PROP_QUERY_DETAIL_METRICS_SIZE); + await this._readProperty(communicator, buffer, PROP_QUERY_PARALLELISM); + await this._readProperty(communicator, buffer, PROP_READ_FROM_BACKUP); + await this._readProperty(communicator, buffer, PROP_REBALANCE_BATCH_SIZE); + await this._readProperty(communicator, buffer, PROP_REBALANCE_BATCHES_PREFETCH_COUNT); + await this._readProperty(communicator, buffer, PROP_REBALANCE_DELAY); + await this._readProperty(communicator, buffer, PROP_REBALANCE_MODE); + await this._readProperty(communicator, buffer, PROP_REBALANCE_ORDER); + await this._readProperty(communicator, buffer, PROP_REBALANCE_THROTTLE); + await this._readProperty(communicator, buffer, PROP_REBALANCE_TIMEOUT); + await this._readProperty(communicator, buffer, PROP_SQL_ESCAPE_ALL); + await this._readProperty(communicator, buffer, PROP_SQL_INDEX_INLINE_MAX_SIZE); + await this._readProperty(communicator, buffer, PROP_SQL_SCHEMA); + await this._readProperty(communicator, buffer, PROP_WRITE_SYNCHRONIZATION_MODE); + await this._readProperty(communicator, buffer, PROP_CACHE_KEY_CONFIGURATION); + await this._readProperty(communicator, buffer, PROP_QUERY_ENTITY); } /** * @ignore */ - async _readProperty(buffer, propertyCode) { + async _readProperty(communicator, buffer, propertyCode) { const propertyType = PROP_TYPES[propertyCode]; switch (BinaryUtils.getTypeCode(propertyType)) { case BinaryUtils.TYPE_CODE.INTEGER: case BinaryUtils.TYPE_CODE.LONG: case BinaryUtils.TYPE_CODE.BOOLEAN: - this._properties.set(propertyCode, await BinaryReader._readTypedObject(buffer, propertyType)); + this._properties.set(propertyCode, await communicator._readTypedObject(buffer, propertyType)); return; case BinaryUtils.TYPE_CODE.STRING: - this._properties.set(propertyCode, await BinaryReader.readObject(buffer, propertyType)); + this._properties.set(propertyCode, await communicator.readObject(buffer, propertyType)); return; case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: const length = buffer.readInteger(); @@ -1714,7 +1715,7 @@ class CacheConfiguration { const properties = new Array(length); for (let i = 0; i < length; i++) { const property = new propertyType._elementType._objectConstructor(); - await property._read(buffer); + await property._read(communicator, buffer); properties[i] = property; } this._properties.set(propertyCode, properties); diff --git a/lib/Cursor.js b/lib/Cursor.js index 85176e3..39eea21 100644 --- a/lib/Cursor.js +++ b/lib/Cursor.js @@ -20,8 +20,6 @@ const Errors = require('./Errors'); const BinaryUtils = require('./internal/BinaryUtils'); const BinaryObject = require('./BinaryObject'); -const BinaryReader = require('./internal/BinaryReader'); -const BinaryWriter = require('./internal/BinaryWriter'); /** * Class representing a cursor to obtain results of SQL and Scan query operations. @@ -101,7 +99,7 @@ class Cursor { async close() { // Close cursor only if the server has more pages: the server closes cursor automatically on last page if (this._id && this._hasNext) { - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.RESOURCE_CLOSE, async (payload) => { await this._write(payload); @@ -114,8 +112,8 @@ class Cursor { /** * @ignore */ - constructor(socket, operation, buffer, keyType = null, valueType = null) { - this._socket = socket; + constructor(communicator, operation, buffer, keyType = null, valueType = null) { + this._communicator = communicator; this._operation = operation; this._buffer = buffer; this._keyType = keyType; @@ -133,7 +131,7 @@ class Cursor { this._hasNext = false; this._values = null; this._buffer = null; - await this._socket.send( + await this._communicator.send( this._operation, async (payload) => { await this._write(payload); @@ -175,8 +173,8 @@ class Cursor { async _readRow(buffer) { const CacheEntry = require('./CacheClient').CacheEntry; return new CacheEntry( - await BinaryReader.readObject(buffer, this._keyType), - await BinaryReader.readObject(buffer, this._valueType)); + await this._communicator.readObject(buffer, this._keyType), + await this._communicator.readObject(buffer, this._valueType)); } /** @@ -273,8 +271,8 @@ class SqlFieldsCursor extends Cursor { /** * @ignore */ - constructor(socket, buffer) { - super(socket, BinaryUtils.OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer); + constructor(communicator, buffer) { + super(communicator, BinaryUtils.OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer); this._fieldNames = []; } @@ -286,7 +284,7 @@ class SqlFieldsCursor extends Cursor { this._fieldCount = buffer.readInteger(); if (includeFieldNames) { for (let i = 0; i < this._fieldCount; i++) { - this._fieldNames[i] = await BinaryReader.readObject(buffer); + this._fieldNames[i] = await this._communicator.readObject(buffer); } } } @@ -299,7 +297,7 @@ class SqlFieldsCursor extends Cursor { let fieldType; for (let i = 0; i < this._fieldCount; i++) { fieldType = this._fieldTypes && i < this._fieldTypes.length ? this._fieldTypes[i] : null; - values[i] = await BinaryReader.readObject(buffer); + values[i] = await this._communicator.readObject(buffer, fieldType); } return values; } diff --git a/lib/EnumItem.js b/lib/EnumItem.js index e4fb165..1d1725e 100644 --- a/lib/EnumItem.js +++ b/lib/EnumItem.js @@ -156,14 +156,14 @@ class EnumItem { /** * @ignore */ - async _write(buffer) { + async _write(communicator, buffer) { buffer.writeInteger(this._typeId); if (this._ordinal !== null) { buffer.writeInteger(this._ordinal); return; } else if (this._name !== null || this._value !== null) { - const type = await this._getType(this._typeId); + const type = await this._getType(communicator, this._typeId); if (type._isEnum && type._enumValues) { for (let i = 0; i < type._enumValues.length; i++) { if (this._name === type._enumValues[i][0] || @@ -181,10 +181,10 @@ class EnumItem { /** * @ignore */ - async _read(buffer) { + async _read(communicator, buffer) { this._typeId = buffer.readInteger(); this._ordinal = buffer.readInteger(); - const type = await this._getType(this._typeId); + const type = await this._getType(communicator, this._typeId); if (!type._isEnum || !type._enumValues || type._enumValues.length <= this._ordinal) { throw new Errors.IgniteClientError('EnumItem can not be deserialized: type mismatch'); } @@ -195,9 +195,8 @@ class EnumItem { /** * @ignore */ - async _getType(typeId) { - const BinaryTypeStorage = require('./internal/BinaryTypeStorage'); - return await BinaryTypeStorage.getEntity().getType(typeId); + async _getType(communicator, typeId) { + return await communicator.typeStorage.getType(typeId); } } diff --git a/lib/IgniteClient.js b/lib/IgniteClient.js index ba3361f..544c37f 100644 --- a/lib/IgniteClient.js +++ b/lib/IgniteClient.js @@ -21,9 +21,7 @@ const CacheClient = require('./CacheClient'); const IgniteClientConfiguration = require('./IgniteClientConfiguration'); const CacheConfiguration = require('./CacheConfiguration'); const BinaryUtils = require('./internal/BinaryUtils'); -const BinaryWriter = require('./internal/BinaryWriter'); -const BinaryReader = require('./internal/BinaryReader'); -const BinaryTypeStorage = require('./internal/BinaryTypeStorage'); +const BinaryCommunicator = require('./internal/BinaryCommunicator'); const ArgumentChecker = require('./internal/ArgumentChecker'); const Logger = require('./internal/Logger'); @@ -70,7 +68,7 @@ class IgniteClient { constructor(onStateChanged = null) { const ClientFailoverSocket = require('./internal/ClientFailoverSocket'); this._socket = new ClientFailoverSocket(onStateChanged); - BinaryTypeStorage.createEntity(this._socket); + this._communicator = new BinaryCommunicator(this._socket); } static get STATE() { @@ -133,7 +131,7 @@ class IgniteClient { ArgumentChecker.notEmpty(name, 'name'); ArgumentChecker.hasType(cacheConfig, 'cacheConfig', false, CacheConfiguration); - await this._socket.send( + await this._communicator.send( cacheConfig ? BinaryUtils.OPERATION.CACHE_CREATE_WITH_CONFIGURATION : BinaryUtils.OPERATION.CACHE_CREATE_WITH_NAME, @@ -161,7 +159,7 @@ class IgniteClient { async getOrCreateCache(name, cacheConfig = null) { ArgumentChecker.notEmpty(name, 'name'); ArgumentChecker.hasType(cacheConfig, 'cacheConfig', false, CacheConfiguration); - await this._socket.send( + await this._communicator.send( cacheConfig ? BinaryUtils.OPERATION.CACHE_GET_OR_CREATE_WITH_CONFIGURATION : BinaryUtils.OPERATION.CACHE_GET_OR_CREATE_WITH_NAME, @@ -199,7 +197,7 @@ class IgniteClient { */ async destroyCache(name) { ArgumentChecker.notEmpty(name, 'name'); - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_DESTROY, async (payload) => { payload.writeInteger(CacheClient._calculateId(name)); @@ -222,7 +220,7 @@ class IgniteClient { async getCacheConfiguration(name) { ArgumentChecker.notEmpty(name, 'name'); let config; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_GET_CONFIGURATION, async (payload) => { payload.writeInteger(CacheClient._calculateId(name)); @@ -230,7 +228,7 @@ class IgniteClient { }, async (payload) => { config = new CacheConfiguration(); - await config._read(payload); + await config._read(this._communicator, payload); }); return config; } @@ -248,11 +246,11 @@ class IgniteClient { */ async cacheNames() { let names; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.CACHE_GET_NAMES, null, async (payload) => { - names = await BinaryReader.readStringArray(payload); + names = await this._communicator.readStringArray(payload); }); return names; } @@ -273,7 +271,7 @@ class IgniteClient { * @ignore */ _getCache(name, cacheConfig = null) { - return new CacheClient(name, cacheConfig, this._socket); + return new CacheClient(name, cacheConfig, this._communicator); } /** @@ -281,10 +279,10 @@ class IgniteClient { */ async _writeCacheNameOrConfig(buffer, name, cacheConfig) { if (cacheConfig) { - await cacheConfig._write(buffer, name); + await cacheConfig._write(this._communicator, buffer, name); } else { - await BinaryWriter.writeString(buffer, name); + BinaryCommunicator.writeString(buffer, name); } } } diff --git a/lib/Query.js b/lib/Query.js index 5c230df..029ec3d 100644 --- a/lib/Query.js +++ b/lib/Query.js @@ -20,7 +20,7 @@ const Cursor = require('./Cursor').Cursor; const SqlFieldsCursor = require('./Cursor').SqlFieldsCursor; const ArgumentChecker = require('./internal/ArgumentChecker'); -const BinaryWriter = require('./internal/BinaryWriter'); +const BinaryCommunicator = require('./internal/BinaryCommunicator'); const BinaryUtils = require('./internal/BinaryUtils'); const PAGE_SIZE_DEFAULT = 1024; @@ -220,10 +220,10 @@ class SqlQuery extends Query { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._type); - await BinaryWriter.writeString(buffer, this._sql); - await this._writeArgs(buffer); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._type); + BinaryCommunicator.writeString(buffer, this._sql); + await this._writeArgs(communicator, buffer); buffer.writeBoolean(this._distributedJoins); buffer.writeBoolean(this._local); buffer.writeBoolean(this._replicatedOnly); @@ -234,14 +234,14 @@ class SqlQuery extends Query { /** * @ignore */ - async _writeArgs(buffer) { + async _writeArgs(communicator, buffer) { const argsLength = this._args ? this._args.length : 0; buffer.writeInteger(argsLength); if (argsLength > 0) { let argType; for (let i = 0; i < argsLength; i++) { argType = this._argTypes && i < this._argTypes.length ? this._argTypes[i] : null; - await BinaryWriter.writeObject(buffer, this._args[i], argType); + await communicator.writeObject(buffer, this._args[i], argType); } } } @@ -249,8 +249,8 @@ class SqlQuery extends Query { /** * @ignore */ - async _getCursor(socket, payload, keyType = null, valueType = null) { - const cursor = new Cursor(socket, BinaryUtils.OPERATION.QUERY_SQL_CURSOR_GET_PAGE, payload, keyType, valueType); + async _getCursor(communicator, payload, keyType = null, valueType = null) { + const cursor = new Cursor(communicator, BinaryUtils.OPERATION.QUERY_SQL_CURSOR_GET_PAGE, payload, keyType, valueType); cursor._readId(payload); return cursor; } @@ -410,12 +410,12 @@ class SqlFieldsQuery extends SqlQuery { /** * @ignore */ - async _write(buffer) { - await BinaryWriter.writeString(buffer, this._schema); + async _write(communicator, buffer) { + BinaryCommunicator.writeString(buffer, this._schema); buffer.writeInteger(this._pageSize); buffer.writeInteger(this._maxRows); - await BinaryWriter.writeString(buffer, this._sql); - await this._writeArgs(buffer) + BinaryCommunicator.writeString(buffer, this._sql); + await this._writeArgs(communicator, buffer) buffer.writeByte(this._statementType); buffer.writeBoolean(this._distributedJoins); buffer.writeBoolean(this._local); @@ -430,8 +430,8 @@ class SqlFieldsQuery extends SqlQuery { /** * @ignore */ - async _getCursor(socket, payload, keyType = null, valueType = null) { - const cursor = new SqlFieldsCursor(socket, payload); + async _getCursor(communicator, payload, keyType = null, valueType = null) { + const cursor = new SqlFieldsCursor(communicator, payload); await cursor._readFieldNames(payload, this._includeFieldNames); return cursor; } @@ -485,9 +485,9 @@ class ScanQuery extends Query { /** * @ignore */ - async _write(buffer) { + async _write(communicator, buffer) { // filter - await BinaryWriter.writeObject(buffer, null); + await communicator.writeObject(buffer, null); buffer.writeInteger(this._pageSize); buffer.writeInteger(this._partitionNumber); buffer.writeBoolean(this._local); @@ -496,8 +496,8 @@ class ScanQuery extends Query { /** * @ignore */ - async _getCursor(socket, payload, keyType = null, valueType = null) { - const cursor = new Cursor(socket, BinaryUtils.OPERATION.QUERY_SCAN_CURSOR_GET_PAGE, payload, keyType, valueType); + async _getCursor(communicator, payload, keyType = null, valueType = null) { + const cursor = new Cursor(communicator, BinaryUtils.OPERATION.QUERY_SCAN_CURSOR_GET_PAGE, payload, keyType, valueType); cursor._readId(payload); return cursor; } diff --git a/lib/internal/BinaryCommunicator.js b/lib/internal/BinaryCommunicator.js new file mode 100644 index 0000000..9418d36 --- /dev/null +++ b/lib/internal/BinaryCommunicator.js @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const Decimal = require('decimal.js'); +const CollectionObjectType = require('../ObjectType').CollectionObjectType; +const ComplexObjectType = require('../ObjectType').ComplexObjectType; +const Errors = require('../Errors'); +const Timestamp = require('../Timestamp'); +const EnumItem = require('../EnumItem'); +const BinaryUtils = require('./BinaryUtils'); +const BinaryTypeStorage = require('./BinaryTypeStorage'); + +class BinaryCommunicator { + + constructor(socket) { + this._socket = socket; + this._typeStorage = new BinaryTypeStorage(this); + } + + static readString(buffer) { + const typeCode = buffer.readByte(); + BinaryUtils.checkTypesComatibility(BinaryUtils.TYPE_CODE.STRING, typeCode); + if (typeCode === BinaryUtils.TYPE_CODE.NULL) { + return null; + } + return buffer.readString(); + } + + static writeString(buffer, value) { + if (value === null) { + buffer.writeByte(BinaryUtils.TYPE_CODE.NULL); + } + else { + buffer.writeByte(BinaryUtils.TYPE_CODE.STRING); + buffer.writeString(value); + } + } + + async send(opCode, payloadWriter, payloadReader = null) { + await this._socket.send(opCode, payloadWriter, payloadReader); + } + + get typeStorage() { + return this._typeStorage; + } + + async readObject(buffer, expectedType = null) { + const typeCode = buffer.readByte(); + BinaryUtils.checkTypesComatibility(expectedType, typeCode); + return await this._readTypedObject(buffer, typeCode, expectedType); + } + + async readStringArray(buffer) { + return await this._readTypedObject(buffer, BinaryUtils.TYPE_CODE.STRING_ARRAY); + } + + async writeObject(buffer, object, objectType = null, writeObjectType = true) { + BinaryUtils.checkCompatibility(object, objectType); + if (object === null) { + buffer.writeByte(BinaryUtils.TYPE_CODE.NULL); + return; + } + + objectType = objectType ? objectType : BinaryUtils.calcObjectType(object); + const objectTypeCode = BinaryUtils.getTypeCode(objectType); + + if (writeObjectType) { + buffer.writeByte(objectTypeCode); + } + switch (objectTypeCode) { + case BinaryUtils.TYPE_CODE.BYTE: + case BinaryUtils.TYPE_CODE.SHORT: + case BinaryUtils.TYPE_CODE.INTEGER: + case BinaryUtils.TYPE_CODE.FLOAT: + case BinaryUtils.TYPE_CODE.DOUBLE: + buffer.writeNumber(object, objectTypeCode); + break; + case BinaryUtils.TYPE_CODE.LONG: + buffer.writeLong(object); + break; + case BinaryUtils.TYPE_CODE.CHAR: + buffer.writeChar(object); + break; + case BinaryUtils.TYPE_CODE.BOOLEAN: + buffer.writeBoolean(object); + break; + case BinaryUtils.TYPE_CODE.STRING: + buffer.writeString(object); + break; + case BinaryUtils.TYPE_CODE.UUID: + this._writeUUID(buffer, object); + break; + case BinaryUtils.TYPE_CODE.DATE: + buffer.writeDate(object); + break; + case BinaryUtils.TYPE_CODE.ENUM: + await this._writeEnum(buffer, object); + break; + case BinaryUtils.TYPE_CODE.DECIMAL: + this._writeDecimal(buffer, object); + break; + case BinaryUtils.TYPE_CODE.TIMESTAMP: + this._writeTimestamp(buffer, object); + break; + case BinaryUtils.TYPE_CODE.TIME: + this._writeTime(buffer, object); + break; + case BinaryUtils.TYPE_CODE.BYTE_ARRAY: + case BinaryUtils.TYPE_CODE.SHORT_ARRAY: + case BinaryUtils.TYPE_CODE.INTEGER_ARRAY: + case BinaryUtils.TYPE_CODE.LONG_ARRAY: + case BinaryUtils.TYPE_CODE.FLOAT_ARRAY: + case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY: + case BinaryUtils.TYPE_CODE.CHAR_ARRAY: + case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY: + case BinaryUtils.TYPE_CODE.STRING_ARRAY: + case BinaryUtils.TYPE_CODE.UUID_ARRAY: + case BinaryUtils.TYPE_CODE.DATE_ARRAY: + case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: + case BinaryUtils.TYPE_CODE.ENUM_ARRAY: + case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY: + case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY: + case BinaryUtils.TYPE_CODE.TIME_ARRAY: + await this._writeArray(buffer, object, objectType, objectTypeCode); + break; + case BinaryUtils.TYPE_CODE.COLLECTION: + await this._writeCollection(buffer, object, objectType); + break; + case BinaryUtils.TYPE_CODE.MAP: + await this._writeMap(buffer, object, objectType); + break; + case BinaryUtils.TYPE_CODE.BINARY_OBJECT: + await this._writeBinaryObject(buffer, object, objectType); + break; + case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT: + await this._writeComplexObject(buffer, object, objectType); + break; + default: + throw Errors.IgniteClientError.unsupportedTypeError(objectType); + } + } + + async _readTypedObject(buffer, objectTypeCode, expectedType = null) { + switch (objectTypeCode) { + case BinaryUtils.TYPE_CODE.BYTE: + case BinaryUtils.TYPE_CODE.SHORT: + case BinaryUtils.TYPE_CODE.INTEGER: + case BinaryUtils.TYPE_CODE.FLOAT: + case BinaryUtils.TYPE_CODE.DOUBLE: + return buffer.readNumber(objectTypeCode); + case BinaryUtils.TYPE_CODE.LONG: + return buffer.readLong().toNumber(); + case BinaryUtils.TYPE_CODE.CHAR: + return buffer.readChar(); + case BinaryUtils.TYPE_CODE.BOOLEAN: + return buffer.readBoolean(); + case BinaryUtils.TYPE_CODE.STRING: + return buffer.readString(); + case BinaryUtils.TYPE_CODE.UUID: + return this._readUUID(buffer); + case BinaryUtils.TYPE_CODE.DATE: + return buffer.readDate(); + case BinaryUtils.TYPE_CODE.ENUM: + case BinaryUtils.TYPE_CODE.BINARY_ENUM: + return await this._readEnum(buffer); + case BinaryUtils.TYPE_CODE.DECIMAL: + return this._readDecimal(buffer); + case BinaryUtils.TYPE_CODE.TIMESTAMP: + return this._readTimestamp(buffer); + case BinaryUtils.TYPE_CODE.TIME: + return buffer.readDate(); + case BinaryUtils.TYPE_CODE.BYTE_ARRAY: + case BinaryUtils.TYPE_CODE.SHORT_ARRAY: + case BinaryUtils.TYPE_CODE.INTEGER_ARRAY: + case BinaryUtils.TYPE_CODE.LONG_ARRAY: + case BinaryUtils.TYPE_CODE.FLOAT_ARRAY: + case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY: + case BinaryUtils.TYPE_CODE.CHAR_ARRAY: + case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY: + case BinaryUtils.TYPE_CODE.STRING_ARRAY: + case BinaryUtils.TYPE_CODE.UUID_ARRAY: + case BinaryUtils.TYPE_CODE.DATE_ARRAY: + case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: + case BinaryUtils.TYPE_CODE.ENUM_ARRAY: + case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY: + case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY: + case BinaryUtils.TYPE_CODE.TIME_ARRAY: + return await this._readArray(buffer, objectTypeCode, expectedType); + case BinaryUtils.TYPE_CODE.COLLECTION: + return await this._readCollection(buffer, expectedType); + case BinaryUtils.TYPE_CODE.MAP: + return await this._readMap(buffer, expectedType); + case BinaryUtils.TYPE_CODE.BINARY_OBJECT: + return await this._readBinaryObject(buffer, expectedType); + case BinaryUtils.TYPE_CODE.NULL: + return null; + case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT: + return await this._readComplexObject(buffer, expectedType); + default: + throw Errors.IgniteClientError.unsupportedTypeError(objectTypeCode); + } + } + + _readUUID(buffer) { + return [...buffer.readBuffer(BinaryUtils.getSize(BinaryUtils.TYPE_CODE.UUID))]; + } + + async _readEnum(buffer) { + const enumItem = new EnumItem(0); + await enumItem._read(this, buffer); + return enumItem; + } + + _readDecimal(buffer) { + const scale = buffer.readInteger(); + const dataLength = buffer.readInteger(); + const data = buffer.readBuffer(dataLength); + const isNegative = (data[0] & 0x80) !== 0; + if (isNegative) { + data[0] &= 0x7F; + } + let result = new Decimal('0x' + data.toString('hex')); + if (isNegative) { + result = result.negated(); + } + return result.mul(Decimal.pow(10, -scale)); + } + + _readTimestamp(buffer) { + return new Timestamp(buffer.readLong().toNumber(), buffer.readInteger()); + } + + async _readArray(buffer, arrayTypeCode, arrayType) { + if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) { + buffer.readInteger(); + } + const length = buffer.readInteger(); + const elementType = BinaryUtils.getArrayElementType(arrayType ? arrayType : arrayTypeCode); + const keepElementType = elementType === null ? true : BinaryUtils.keepArrayElementType(arrayTypeCode); + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = keepElementType ? + await this.readObject(buffer, elementType) : + await this._readTypedObject(buffer, elementType); + } + return result; + } + + async _readMap(buffer, expectedMapType) { + const result = new Map(); + const size = buffer.readInteger(); + const subType = buffer.readByte(); + let key, value; + for (let i = 0; i < size; i++) { + key = await this.readObject(buffer, expectedMapType ? expectedMapType._keyType : null); + value = await this.readObject(buffer, expectedMapType ? expectedMapType._valueType : null); + result.set(key, value); + } + return result; + } + + async _readCollection(buffer, expectedColType) { + const size = buffer.readInteger(); + const subType = buffer.readByte(); + const isSet = CollectionObjectType._isSet(subType); + const result = isSet ? new Set() : new Array(size); + let element; + for (let i = 0; i < size; i++) { + element = await this.readObject(buffer, expectedColType ? expectedColType._elementType : null); + if (isSet) { + result.add(element); + } + else { + result[i] = element; + } + } + return result; + } + + async _readBinaryObject(buffer, expectedType) { + const size = buffer.readInteger(); + const startPos = buffer.position; + buffer.position = startPos + size; + const offset = buffer.readInteger(); + const endPos = buffer.position; + buffer.position = startPos + offset; + const result = await this.readObject(buffer, expectedType); + buffer.position = endPos; + return result; + } + + async _readComplexObject(buffer, expectedType) { + buffer.position = buffer.position - 1; + const BinaryObject = require('../BinaryObject'); + const binaryObject = await BinaryObject._fromBuffer(this, buffer); + return expectedType ? + await binaryObject.toObject(expectedType) : binaryObject; + } + + _writeUUID(buffer, value) { + buffer.writeBuffer(Buffer.from(value)); + } + + async _writeEnum(buffer, enumValue) { + await enumValue._write(this, buffer); + } + + _writeDecimal(buffer, decimal) { + let strValue = decimal.toExponential(); + let expIndex = strValue.indexOf('e'); + if (expIndex < 0) { + expIndex = strValue.indexOf('E'); + } + let scale = 0; + if (expIndex >= 0) { + scale = parseInt(strValue.substring(expIndex + 1)); + strValue = strValue.substring(0, expIndex); + } + const isNegative = strValue.startsWith('-'); + if (isNegative) { + strValue = strValue.substring(1); + } + const dotIndex = strValue.indexOf('.'); + if (dotIndex >= 0) { + scale -= strValue.length - dotIndex - 1; + strValue = strValue.substring(0, dotIndex) + strValue.substring(dotIndex + 1); + } + scale = -scale; + let hexValue = new Decimal(strValue).toHexadecimal().substring(2); + hexValue = ((hexValue.length % 2 !== 0) ? '000' : '00') + hexValue; + const valueBuffer = Buffer.from(hexValue, 'hex'); + if (isNegative) { + valueBuffer[0] |= 0x80; + } + buffer.writeInteger(scale); + buffer.writeInteger(valueBuffer.length); + buffer.writeBuffer(valueBuffer); + } + + _writeTimestamp(buffer, timestamp) { + buffer.writeDate(timestamp); + buffer.writeInteger(timestamp.getNanos()); + } + + _writeTime(buffer, time) { + const midnight = new Date(time); + midnight.setHours(0, 0, 0, 0); + buffer.writeLong(time.getTime() - midnight.getTime()); + } + + async _writeArray(buffer, array, arrayType, arrayTypeCode) { + const BinaryType = require('./BinaryType'); + const elementType = BinaryUtils.getArrayElementType(arrayType); + const keepElementType = BinaryUtils.keepArrayElementType(arrayTypeCode); + if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) { + buffer.writeInteger(elementType instanceof ComplexObjectType ? + BinaryType._calculateId(elementType._typeName) : -1); + } + buffer.writeInteger(array.length); + for (let elem of array) { + await this.writeObject(buffer, elem, elementType, keepElementType); + } + } + + async _writeCollection(buffer, collection, collectionType) { + buffer.writeInteger(collection instanceof Set ? collection.size : collection.length); + buffer.writeByte(collectionType._subType); + for (let element of collection) { + await this.writeObject(buffer, element, collectionType._elementType); + } + } + + async _writeMap(buffer, map, mapType) { + buffer.writeInteger(map.size); + buffer.writeByte(mapType._subType); + for (let [key, value] of map.entries()) { + await this.writeObject(buffer, key, mapType._keyType); + await this.writeObject(buffer, value, mapType._valueType); + } + } + + async _writeBinaryObject(buffer, binaryObject) { + buffer.position = buffer.position - 1; + await binaryObject._write(this, buffer); + } + + async _writeComplexObject(buffer, object, objectType) { + const BinaryObject = require('../BinaryObject'); + await this._writeBinaryObject(buffer, await BinaryObject.fromObject(object, objectType)); + } +} + +module.exports = BinaryCommunicator; diff --git a/lib/internal/BinaryReader.js b/lib/internal/BinaryReader.js deleted file mode 100644 index 8c25c39..0000000 --- a/lib/internal/BinaryReader.js +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict'; - -const Decimal = require('decimal.js'); -const BinaryObject = require('../BinaryObject'); -const CollectionObjectType = require('../ObjectType').CollectionObjectType; -const Errors = require('../Errors'); -const Timestamp = require('../Timestamp'); -const EnumItem = require('../EnumItem'); -const BinaryUtils = require('./BinaryUtils'); - -class BinaryReader { - - static async readObject(buffer, expectedType = null) { - const typeCode = buffer.readByte(); - BinaryUtils.checkTypesComatibility(expectedType, typeCode); - return await BinaryReader._readTypedObject(buffer, typeCode, expectedType); - } - - static async readStringArray(buffer) { - return await BinaryReader._readTypedObject(buffer, BinaryUtils.TYPE_CODE.STRING_ARRAY); - } - - static async _readTypedObject(buffer, objectTypeCode, expectedType = null) { - switch (objectTypeCode) { - case BinaryUtils.TYPE_CODE.BYTE: - case BinaryUtils.TYPE_CODE.SHORT: - case BinaryUtils.TYPE_CODE.INTEGER: - case BinaryUtils.TYPE_CODE.FLOAT: - case BinaryUtils.TYPE_CODE.DOUBLE: - return buffer.readNumber(objectTypeCode); - case BinaryUtils.TYPE_CODE.LONG: - return buffer.readLong().toNumber(); - case BinaryUtils.TYPE_CODE.CHAR: - return buffer.readChar(); - case BinaryUtils.TYPE_CODE.BOOLEAN: - return buffer.readBoolean(); - case BinaryUtils.TYPE_CODE.STRING: - return buffer.readString(); - case BinaryUtils.TYPE_CODE.UUID: - return BinaryReader._readUUID(buffer); - case BinaryUtils.TYPE_CODE.DATE: - return buffer.readDate(); - case BinaryUtils.TYPE_CODE.ENUM: - case BinaryUtils.TYPE_CODE.BINARY_ENUM: - return await BinaryReader._readEnum(buffer); - case BinaryUtils.TYPE_CODE.DECIMAL: - return BinaryReader._readDecimal(buffer); - case BinaryUtils.TYPE_CODE.TIMESTAMP: - return BinaryReader._readTimestamp(buffer); - case BinaryUtils.TYPE_CODE.TIME: - return buffer.readDate(); - case BinaryUtils.TYPE_CODE.BYTE_ARRAY: - case BinaryUtils.TYPE_CODE.SHORT_ARRAY: - case BinaryUtils.TYPE_CODE.INTEGER_ARRAY: - case BinaryUtils.TYPE_CODE.LONG_ARRAY: - case BinaryUtils.TYPE_CODE.FLOAT_ARRAY: - case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY: - case BinaryUtils.TYPE_CODE.CHAR_ARRAY: - case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY: - case BinaryUtils.TYPE_CODE.STRING_ARRAY: - case BinaryUtils.TYPE_CODE.UUID_ARRAY: - case BinaryUtils.TYPE_CODE.DATE_ARRAY: - case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: - case BinaryUtils.TYPE_CODE.ENUM_ARRAY: - case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY: - case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY: - case BinaryUtils.TYPE_CODE.TIME_ARRAY: - return await BinaryReader._readArray(buffer, objectTypeCode, expectedType); - case BinaryUtils.TYPE_CODE.COLLECTION: - return await BinaryReader._readCollection(buffer, expectedType); - case BinaryUtils.TYPE_CODE.MAP: - return await BinaryReader._readMap(buffer, expectedType); - case BinaryUtils.TYPE_CODE.BINARY_OBJECT: - return await BinaryReader._readBinaryObject(buffer, expectedType); - case BinaryUtils.TYPE_CODE.NULL: - return null; - case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT: - return await BinaryReader._readComplexObject(buffer, expectedType); - default: - throw Errors.IgniteClientError.unsupportedTypeError(objectTypeCode); - } - } - - static _readUUID(buffer) { - return [...buffer.readBuffer(BinaryUtils.getSize(BinaryUtils.TYPE_CODE.UUID))]; - } - - static async _readEnum(buffer) { - const enumItem = new EnumItem(0); - await enumItem._read(buffer); - return enumItem; - } - - static _readDecimal(buffer) { - const scale = buffer.readInteger(); - const dataLength = buffer.readInteger(); - const data = buffer.readBuffer(dataLength); - const isNegative = (data[0] & 0x80) !== 0; - if (isNegative) { - data[0] &= 0x7F; - } - let result = new Decimal('0x' + data.toString('hex')); - if (isNegative) { - result = result.negated(); - } - return result.mul(Decimal.pow(10, -scale)); - } - - static _readTimestamp(buffer) { - return new Timestamp(buffer.readLong().toNumber(), buffer.readInteger()); - } - - static async _readArray(buffer, arrayTypeCode, arrayType) { - if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) { - buffer.readInteger(); - } - const length = buffer.readInteger(); - const elementType = BinaryUtils.getArrayElementType(arrayType ? arrayType : arrayTypeCode); - const keepElementType = elementType === null ? true : BinaryUtils.keepArrayElementType(arrayTypeCode); - const result = new Array(length); - for (let i = 0; i < length; i++) { - result[i] = keepElementType ? - await BinaryReader.readObject(buffer, elementType) : - await BinaryReader._readTypedObject(buffer, elementType); - } - return result; - } - - static async _readMap(buffer, expectedMapType) { - const result = new Map(); - const size = buffer.readInteger(); - const subType = buffer.readByte(); - let key, value; - for (let i = 0; i < size; i++) { - key = await BinaryReader.readObject(buffer, expectedMapType ? expectedMapType._keyType : null); - value = await BinaryReader.readObject(buffer, expectedMapType ? expectedMapType._valueType : null); - result.set(key, value); - } - return result; - } - - static async _readCollection(buffer, expectedColType) { - const size = buffer.readInteger(); - const subType = buffer.readByte(); - const isSet = CollectionObjectType._isSet(subType); - const result = isSet ? new Set() : new Array(size); - let element; - for (let i = 0; i < size; i++) { - element = await BinaryReader.readObject(buffer, expectedColType ? expectedColType._elementType : null); - if (isSet) { - result.add(element); - } - else { - result[i] = element; - } - } - return result; - } - - static async _readBinaryObject(buffer, expectedType) { - const size = buffer.readInteger(); - const startPos = buffer.position; - buffer.position = startPos + size; - const offset = buffer.readInteger(); - const endPos = buffer.position; - buffer.position = startPos + offset; - const result = await BinaryReader.readObject(buffer, expectedType); - buffer.position = endPos; - return result; - } - - static async _readComplexObject(buffer, expectedType) { - buffer.position = buffer.position - 1; - const binaryObject = await BinaryObject._fromBuffer(buffer); - return expectedType ? - await binaryObject.toObject(expectedType) : binaryObject; - } -} - -module.exports = BinaryReader; diff --git a/lib/internal/BinaryType.js b/lib/internal/BinaryType.js index b9e239d..98aa6a3 100644 --- a/lib/internal/BinaryType.js +++ b/lib/internal/BinaryType.js @@ -21,7 +21,7 @@ const Util = require('util'); const ComplexObjectType = require('../ObjectType').ComplexObjectType; const BinaryTypeStorage = require('./BinaryTypeStorage'); const BinaryUtils = require('./BinaryUtils'); -const BinaryWriter = require('./BinaryWriter'); +const BinaryCommunicator = require('./BinaryCommunicator'); const Errors = require('../Errors'); class BinaryType { @@ -112,9 +112,9 @@ class BinaryType { // type id buffer.writeInteger(this._id); // type name - await BinaryWriter.writeString(buffer, this._name); + BinaryCommunicator.writeString(buffer, this._name); // affinity key field name - await BinaryWriter.writeString(buffer, null); + BinaryCommunicator.writeString(buffer, null); // fields count buffer.writeInteger(this._fields.size); // fields @@ -136,7 +136,7 @@ class BinaryType { buffer.writeInteger(length); if (length > 0) { for (let [key, value] of this._enumValues) { - await BinaryWriter.writeString(buffer, key); + BinaryCommunicator.writeString(buffer, key); buffer.writeInteger(value); } } @@ -147,10 +147,9 @@ class BinaryType { // type id this._id = buffer.readInteger(); // type name - const BinaryReader = require('./BinaryReader'); - this._name = await BinaryReader.readObject(buffer); + this._name = BinaryCommunicator.readString(buffer); // affinity key field name - await BinaryReader.readObject(buffer); + BinaryCommunicator.readString(buffer); // fields count const fieldsCount = buffer.readInteger(); // fields @@ -173,13 +172,12 @@ class BinaryType { } async _readEnum(buffer) { - const BinaryReader = require('./BinaryReader'); this._isEnum = buffer.readBoolean(); if (this._isEnum) { const valuesCount = buffer.readInteger(); this._enumValues = new Array(valuesCount); for (let i = 0; i < valuesCount; i++) { - this._enumValues[i] = [await BinaryReader.readObject(buffer), buffer.readInteger()]; + this._enumValues[i] = [BinaryCommunicator.readString(buffer), buffer.readInteger()]; } } } @@ -312,7 +310,7 @@ class BinaryField { async _write(buffer) { // field name - await BinaryWriter.writeString(buffer, this._name); + BinaryCommunicator.writeString(buffer, this._name); // type code buffer.writeInteger(this._typeCode); // field id @@ -320,9 +318,8 @@ class BinaryField { } async _read(buffer) { - const BinaryReader = require('./BinaryReader'); // field name - this._name = await BinaryReader.readObject(buffer); + this._name = BinaryCommunicator.readString(buffer); // type code this._typeCode = buffer.readInteger(); // field id @@ -338,10 +335,10 @@ class BinaryTypeBuilder { return result; } - static async fromTypeId(typeId, schemaId, hasSchema) { + static async fromTypeId(communicator, typeId, schemaId, hasSchema) { let result = new BinaryTypeBuilder(); if (hasSchema) { - let type = await BinaryTypeStorage.getEntity().getType(typeId, schemaId); + let type = await communicator.typeStorage.getType(typeId, schemaId); if (type) { result._type = type; result._schema = type.getSchema(schemaId); @@ -372,7 +369,7 @@ class BinaryTypeBuilder { static fromComplexObjectType(complexObjectType, jsObject) { let result = new BinaryTypeBuilder(); - const typeInfo = BinaryTypeStorage.getEntity().getByComplexObjectType(complexObjectType); + const typeInfo = BinaryTypeStorage.getByComplexObjectType(complexObjectType); if (typeInfo) { result._type = typeInfo[0]; result._schema = typeInfo[1]; @@ -380,7 +377,7 @@ class BinaryTypeBuilder { } else { result._fromComplexObjectType(complexObjectType, jsObject); - BinaryTypeStorage.getEntity().setByComplexObjectType(complexObjectType, result._type, result._schema); + BinaryTypeStorage.setByComplexObjectType(complexObjectType, result._type, result._schema); } return result; } @@ -424,9 +421,9 @@ class BinaryTypeBuilder { } } - async finalize() { + async finalize(communicator) { this._schema.finalize(); - await BinaryTypeStorage.getEntity().addType(this._type, this._schema); + await communicator.typeStorage.addType(this._type, this._schema); } constructor() { diff --git a/lib/internal/BinaryTypeStorage.js b/lib/internal/BinaryTypeStorage.js index d79156b..144710a 100644 --- a/lib/internal/BinaryTypeStorage.js +++ b/lib/internal/BinaryTypeStorage.js @@ -22,15 +22,26 @@ const BinaryUtils = require('./BinaryUtils'); class BinaryTypeStorage { - static getEntity() { - if (!BinaryTypeStorage._entity) { - throw Errors.IgniteClientError.internalError(); + constructor(communicator) { + this._communicator = communicator; + this._types = new Map(); + } + + static getByComplexObjectType(complexObjectType) { + return BinaryTypeStorage.complexObjectTypes.get(complexObjectType); + } + + static setByComplexObjectType(complexObjectType, type, schema) { + if (!BinaryTypeStorage.complexObjectTypes.has(complexObjectType)) { + BinaryTypeStorage.complexObjectTypes.set(complexObjectType, [type, schema]); } - return BinaryTypeStorage._entity; } - static createEntity(socket) { - BinaryTypeStorage._entity = new BinaryTypeStorage(socket); + static get complexObjectTypes() { + if (!BinaryTypeStorage._complexObjectTypes) { + BinaryTypeStorage._complexObjectTypes = new Map(); + } + return BinaryTypeStorage._complexObjectTypes; } async addType(binaryType, binarySchema) { @@ -61,29 +72,13 @@ class BinaryTypeStorage { return storageType; } - getByComplexObjectType(complexObjectType) { - return this._complexObjectTypes.get(complexObjectType); - } - - setByComplexObjectType(complexObjectType, type, schema) { - if (!this._complexObjectTypes.has(complexObjectType)) { - this._complexObjectTypes.set(complexObjectType, [type, schema]); - } - } - /** Private methods */ - constructor(socket) { - this._socket = socket; - this._types = new Map(); - this._complexObjectTypes = new Map(); - } - async _getBinaryType(typeId) { const BinaryType = require('./BinaryType'); let binaryType = new BinaryType(null); binaryType._id = typeId; - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.GET_BINARY_TYPE, async (payload) => { payload.writeInteger(typeId); @@ -101,7 +96,7 @@ class BinaryTypeStorage { } async _putBinaryType(binaryType) { - await this._socket.send( + await this._communicator.send( BinaryUtils.OPERATION.PUT_BINARY_TYPE, async (payload) => { await binaryType._write(payload); diff --git a/lib/internal/BinaryWriter.js b/lib/internal/BinaryWriter.js deleted file mode 100644 index 3686bb4..0000000 --- a/lib/internal/BinaryWriter.js +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict'; - -const Decimal = require('decimal.js'); -const Errors = require('../Errors'); -const ComplexObjectType = require('../ObjectType').ComplexObjectType; -const BinaryUtils = require('./BinaryUtils'); - -class BinaryWriter { - - static async writeString(buffer, value) { - await BinaryWriter.writeObject(buffer, value, BinaryUtils.TYPE_CODE.STRING); - } - - static async writeObject(buffer, object, objectType = null, writeObjectType = true) { - BinaryUtils.checkCompatibility(object, objectType); - if (object === null) { - buffer.writeByte(BinaryUtils.TYPE_CODE.NULL); - return; - } - - objectType = objectType ? objectType : BinaryUtils.calcObjectType(object); - const objectTypeCode = BinaryUtils.getTypeCode(objectType); - - if (writeObjectType) { - buffer.writeByte(objectTypeCode); - } - switch (objectTypeCode) { - case BinaryUtils.TYPE_CODE.BYTE: - case BinaryUtils.TYPE_CODE.SHORT: - case BinaryUtils.TYPE_CODE.INTEGER: - case BinaryUtils.TYPE_CODE.FLOAT: - case BinaryUtils.TYPE_CODE.DOUBLE: - buffer.writeNumber(object, objectTypeCode); - break; - case BinaryUtils.TYPE_CODE.LONG: - buffer.writeLong(object); - break; - case BinaryUtils.TYPE_CODE.CHAR: - buffer.writeChar(object); - break; - case BinaryUtils.TYPE_CODE.BOOLEAN: - buffer.writeBoolean(object); - break; - case BinaryUtils.TYPE_CODE.STRING: - buffer.writeString(object); - break; - case BinaryUtils.TYPE_CODE.UUID: - BinaryWriter._writeUUID(buffer, object); - break; - case BinaryUtils.TYPE_CODE.DATE: - buffer.writeDate(object); - break; - case BinaryUtils.TYPE_CODE.ENUM: - await BinaryWriter._writeEnum(buffer, object); - break; - case BinaryUtils.TYPE_CODE.DECIMAL: - BinaryWriter._writeDecimal(buffer, object); - break; - case BinaryUtils.TYPE_CODE.TIMESTAMP: - BinaryWriter._writeTimestamp(buffer, object); - break; - case BinaryUtils.TYPE_CODE.TIME: - BinaryWriter._writeTime(buffer, object); - break; - case BinaryUtils.TYPE_CODE.BYTE_ARRAY: - case BinaryUtils.TYPE_CODE.SHORT_ARRAY: - case BinaryUtils.TYPE_CODE.INTEGER_ARRAY: - case BinaryUtils.TYPE_CODE.LONG_ARRAY: - case BinaryUtils.TYPE_CODE.FLOAT_ARRAY: - case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY: - case BinaryUtils.TYPE_CODE.CHAR_ARRAY: - case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY: - case BinaryUtils.TYPE_CODE.STRING_ARRAY: - case BinaryUtils.TYPE_CODE.UUID_ARRAY: - case BinaryUtils.TYPE_CODE.DATE_ARRAY: - case BinaryUtils.TYPE_CODE.OBJECT_ARRAY: - case BinaryUtils.TYPE_CODE.ENUM_ARRAY: - case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY: - case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY: - case BinaryUtils.TYPE_CODE.TIME_ARRAY: - await BinaryWriter._writeArray(buffer, object, objectType, objectTypeCode); - break; - case BinaryUtils.TYPE_CODE.COLLECTION: - await BinaryWriter._writeCollection(buffer, object, objectType); - break; - case BinaryUtils.TYPE_CODE.MAP: - await BinaryWriter._writeMap(buffer, object, objectType); - break; - case BinaryUtils.TYPE_CODE.BINARY_OBJECT: - await BinaryWriter._writeBinaryObject(buffer, object, objectType); - break; - case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT: - await BinaryWriter._writeComplexObject(buffer, object, objectType); - break; - default: - throw Errors.IgniteClientError.unsupportedTypeError(objectType); - } - } - - static _writeUUID(buffer, value) { - buffer.writeBuffer(Buffer.from(value)); - } - - static async _writeEnum(buffer, enumValue) { - await enumValue._write(buffer); - } - - static _writeDecimal(buffer, decimal) { - let strValue = decimal.toExponential(); - let expIndex = strValue.indexOf('e'); - if (expIndex < 0) { - expIndex = strValue.indexOf('E'); - } - let scale = 0; - if (expIndex >= 0) { - scale = parseInt(strValue.substring(expIndex + 1)); - strValue = strValue.substring(0, expIndex); - } - const isNegative = strValue.startsWith('-'); - if (isNegative) { - strValue = strValue.substring(1); - } - const dotIndex = strValue.indexOf('.'); - if (dotIndex >= 0) { - scale -= strValue.length - dotIndex - 1; - strValue = strValue.substring(0, dotIndex) + strValue.substring(dotIndex + 1); - } - scale = -scale; - let hexValue = new Decimal(strValue).toHexadecimal().substring(2); - hexValue = ((hexValue.length % 2 !== 0) ? '000' : '00') + hexValue; - const valueBuffer = Buffer.from(hexValue, 'hex'); - if (isNegative) { - valueBuffer[0] |= 0x80; - } - buffer.writeInteger(scale); - buffer.writeInteger(valueBuffer.length); - buffer.writeBuffer(valueBuffer); - } - - static _writeTimestamp(buffer, timestamp) { - buffer.writeDate(timestamp); - buffer.writeInteger(timestamp.getNanos()); - } - - static _writeTime(buffer, time) { - const midnight = new Date(time); - midnight.setHours(0, 0, 0, 0); - buffer.writeLong(time.getTime() - midnight.getTime()); - } - - static async _writeArray(buffer, array, arrayType, arrayTypeCode) { - const BinaryType = require('./BinaryType'); - const elementType = BinaryUtils.getArrayElementType(arrayType); - const keepElementType = BinaryUtils.keepArrayElementType(arrayTypeCode); - if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) { - buffer.writeInteger(elementType instanceof ComplexObjectType ? - BinaryType._calculateId(elementType._typeName) : -1); - } - buffer.writeInteger(array.length); - for (let elem of array) { - await BinaryWriter.writeObject(buffer, elem, elementType, keepElementType); - } - } - - static async _writeCollection(buffer, collection, collectionType) { - buffer.writeInteger(collection instanceof Set ? collection.size : collection.length); - buffer.writeByte(collectionType._subType); - for (let element of collection) { - await BinaryWriter.writeObject(buffer, element, collectionType._elementType); - } - } - - static async _writeMap(buffer, map, mapType) { - buffer.writeInteger(map.size); - buffer.writeByte(mapType._subType); - for (let [key, value] of map.entries()) { - await BinaryWriter.writeObject(buffer, key, mapType._keyType); - await BinaryWriter.writeObject(buffer, value, mapType._valueType); - } - } - - static async _writeBinaryObject(buffer, binaryObject) { - buffer.position = buffer.position - 1; - await binaryObject._write(buffer); - } - - static async _writeComplexObject(buffer, object, objectType) { - const BinaryObject = require('../BinaryObject'); - await BinaryWriter._writeBinaryObject(buffer, await BinaryObject.fromObject(object, objectType)); - } -} - -module.exports = BinaryWriter; diff --git a/lib/internal/ClientSocket.js b/lib/internal/ClientSocket.js index 1f12040..ac7ccec 100644 --- a/lib/internal/ClientSocket.js +++ b/lib/internal/ClientSocket.js @@ -26,8 +26,7 @@ const Errors = require('../Errors'); const IgniteClientConfiguration = require('../IgniteClientConfiguration'); const MessageBuffer = require('./MessageBuffer'); const BinaryUtils = require('./BinaryUtils'); -const BinaryReader = require('./BinaryReader'); -const BinaryWriter = require('./BinaryWriter'); +const BinaryCommunicator = require('./BinaryCommunicator'); const ArgumentChecker = require('./ArgumentChecker'); const Logger = require('./Logger'); @@ -240,7 +239,7 @@ class ClientSocket { const serverVersion = new ProtocolVersion(); serverVersion.read(buffer); // Error message - const errMessage = await BinaryReader.readObject(buffer); + const errMessage = BinaryCommunicator.readString(buffer); if (!this._protocolVersion.equals(serverVersion)) { if (!this._isSupportedVersion(serverVersion) || @@ -271,7 +270,7 @@ class ClientSocket { async _finalizeResponse(buffer, request, isSuccess) { if (!isSuccess) { // Error message - const errMessage = await BinaryReader.readObject(buffer); + const errMessage = BinaryCommunicator.readString(buffer); request.reject(new Errors.OperationError(errMessage)); } else { @@ -295,8 +294,8 @@ class ClientSocket { // Client code payload.writeByte(2); if (this._config._userName) { - await BinaryWriter.writeString(payload, this._config._userName); - await BinaryWriter.writeString(payload, this._config._password); + BinaryCommunicator.writeString(payload, this._config._userName); + BinaryCommunicator.writeString(payload, this._config._password); } } diff --git a/spec/examples/AuthExample.spec.js b/spec/examples/AuthExample.spec.js index 3fb9205..667a396 100644 --- a/spec/examples/AuthExample.spec.js +++ b/spec/examples/AuthExample.spec.js @@ -20,6 +20,11 @@ const TestingHelper = require('../TestingHelper'); describe('execute auth example >', () => { + beforeAll((done) => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = TestingHelper.TIMEOUT; + done(); + }); + it('AuthTlsExample', (done) => { TestingHelper.executeExample('examples/AuthTlsExample.js'). then(done). diff --git a/spec/examples/Examples.spec.js b/spec/examples/Examples.spec.js index c8dce3c..2ba5f08 100644 --- a/spec/examples/Examples.spec.js +++ b/spec/examples/Examples.spec.js @@ -20,6 +20,11 @@ const TestingHelper = require('../TestingHelper'); describe('execute examples >', () => { + beforeAll((done) => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = TestingHelper.TIMEOUT; + done(); + }); + it('CachePutGetExample', (done) => { TestingHelper.executeExample('examples/CachePutGetExample.js'). then(done).
