THRIFT-2964: nodejs file breakout Client: NodeJS Patch: Andrew de Andrade Moves protocols and transports (among others) into seperate files.
Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/96f4f07b Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/96f4f07b Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/96f4f07b Branch: refs/heads/master Commit: 96f4f07beb758db9c17d1332b5d225b6b627538c Parents: 43509df Author: Randy Abernethy <[email protected]> Authored: Tue Feb 10 02:29:15 2015 -0800 Committer: Randy Abernethy <[email protected]> Committed: Tue Feb 10 02:29:15 2015 -0800 ---------------------------------------------------------------------- lib/nodejs/lib/thrift/binary_protocol.js | 356 ++++ lib/nodejs/lib/thrift/buffered_transport.js | 175 ++ lib/nodejs/lib/thrift/compact_protocol.js | 907 +++++++++ lib/nodejs/lib/thrift/connection.js | 85 +- lib/nodejs/lib/thrift/create_client.js | 54 + lib/nodejs/lib/thrift/framed_transport.js | 182 ++ lib/nodejs/lib/thrift/http_connection.js | 75 +- lib/nodejs/lib/thrift/index.js | 10 +- .../lib/thrift/input_buffer_underrun_error.js | 27 + lib/nodejs/lib/thrift/json_protocol.js | 706 +++++++ lib/nodejs/lib/thrift/log.js | 26 + lib/nodejs/lib/thrift/multiplexed_processor.js | 63 +- lib/nodejs/lib/thrift/multiplexed_protocol.js | 81 +- lib/nodejs/lib/thrift/protocol.js | 1727 +----------------- lib/nodejs/lib/thrift/server.js | 14 +- lib/nodejs/lib/thrift/transport.js | 284 +-- lib/nodejs/lib/thrift/web_server.js | 93 +- lib/nodejs/lib/thrift/ws_connection.js | 42 +- lib/nodejs/test/binary.test.js | 8 +- lib/nodejs/test/client.js | 4 +- lib/nodejs/test/http_client.js | 6 +- lib/nodejs/test/http_server.js | 22 +- lib/nodejs/test/multiplex_client.js | 2 +- lib/nodejs/test/server.js | 6 +- lib/nodejs/test/test_handler.js | 2 +- lib/nodejs/test/thrift_test_driver.js | 12 +- lib/nodejs/test/thrift_test_driver_promise.js | 6 +- lib/nodejs/test/ws_client.js | 6 +- 28 files changed, 2692 insertions(+), 2289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/binary_protocol.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/binary_protocol.js b/lib/nodejs/lib/thrift/binary_protocol.js new file mode 100644 index 0000000..a230291 --- /dev/null +++ b/lib/nodejs/lib/thrift/binary_protocol.js @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var log = require('./log'); +var binary = require('./binary'); +var Int64 = require('node-int64'); +var Thrift = require('./thrift'); +var Type = Thrift.Type; + +module.exports = TBinaryProtocol; + +// JavaScript supports only numeric doubles, therefore even hex values are always signed. +// The largest integer value which can be represented in JavaScript is +/-2^53. +// Bitwise operations convert numbers to 32 bit integers but perform sign extension +// upon assigning values back to variables. +var VERSION_MASK = -65536, // 0xffff0000 + VERSION_1 = -2147418112, // 0x80010000 + TYPE_MASK = 0x000000ff; + +function TBinaryProtocol(trans, strictRead, strictWrite) { + this.trans = trans; + this.strictRead = (strictRead !== undefined ? strictRead : false); + this.strictWrite = (strictWrite !== undefined ? strictWrite : true); +}; + +TBinaryProtocol.prototype.flush = function() { + return this.trans.flush(); +}; + +TBinaryProtocol.prototype.writeMessageBegin = function(name, type, seqid) { + if (this.strictWrite) { + this.writeI32(VERSION_1 | type); + this.writeString(name); + this.writeI32(seqid); + } else { + this.writeString(name); + this.writeByte(type); + this.writeI32(seqid); + } + // Record client seqid to find callback again + if (this._seqid) { + // TODO better logging log warning + log.warning('SeqId already set', { 'name': name }); + } else { + this._seqid = seqid; + this.trans.setCurrSeqId(seqid); + } +}; + +TBinaryProtocol.prototype.writeMessageEnd = function() { + if (this._seqid) { + this._seqid = null; + } else { + log.warning('No seqid to unset'); + } +}; + +TBinaryProtocol.prototype.writeStructBegin = function(name) { +}; + +TBinaryProtocol.prototype.writeStructEnd = function() { +}; + +TBinaryProtocol.prototype.writeFieldBegin = function(name, type, id) { + this.writeByte(type); + this.writeI16(id); +}; + +TBinaryProtocol.prototype.writeFieldEnd = function() { +}; + +TBinaryProtocol.prototype.writeFieldStop = function() { + this.writeByte(Type.STOP); +}; + +TBinaryProtocol.prototype.writeMapBegin = function(ktype, vtype, size) { + this.writeByte(ktype); + this.writeByte(vtype); + this.writeI32(size); +}; + +TBinaryProtocol.prototype.writeMapEnd = function() { +}; + +TBinaryProtocol.prototype.writeListBegin = function(etype, size) { + this.writeByte(etype); + this.writeI32(size); +}; + +TBinaryProtocol.prototype.writeListEnd = function() { +}; + +TBinaryProtocol.prototype.writeSetBegin = function(etype, size) { + this.writeByte(etype); + this.writeI32(size); +}; + +TBinaryProtocol.prototype.writeSetEnd = function() { +}; + +TBinaryProtocol.prototype.writeBool = function(bool) { + if (bool) { + this.writeByte(1); + } else { + this.writeByte(0); + } +}; + +TBinaryProtocol.prototype.writeByte = function(b) { + this.trans.write(new Buffer([b])); +}; + +TBinaryProtocol.prototype.writeI16 = function(i16) { + this.trans.write(binary.writeI16(new Buffer(2), i16)); +}; + +TBinaryProtocol.prototype.writeI32 = function(i32) { + this.trans.write(binary.writeI32(new Buffer(4), i32)); +}; + +TBinaryProtocol.prototype.writeI64 = function(i64) { + if (i64.buffer) { + this.trans.write(i64.buffer); + } else { + this.trans.write(new Int64(i64).buffer); + } +}; + +TBinaryProtocol.prototype.writeDouble = function(dub) { + this.trans.write(binary.writeDouble(new Buffer(8), dub)); +}; + +TBinaryProtocol.prototype.writeString = function(arg) { + if (typeof(arg) === 'string') { + this.writeI32(Buffer.byteLength(arg, 'utf8')); + this.trans.write(arg, 'utf8'); + } else if (arg instanceof Buffer) { + this.writeI32(arg.length); + this.trans.write(arg); + } else { + throw new Error('writeString called without a string/Buffer argument: ' + arg); + } +}; + +TBinaryProtocol.prototype.writeBinary = function(arg) { + if (typeof(arg) === 'string') { + this.writeI32(Buffer.byteLength(arg, 'utf8')); + this.trans.write(arg, 'utf8'); + } else if ((arg instanceof Buffer) || + (Object.prototype.toString.call(arg) == '[object Uint8Array]')) { + // Buffers in Node.js under Browserify may extend UInt8Array instead of + // defining a new object. We detect them here so we can write them + // correctly + this.writeI32(arg.length); + this.trans.write(arg); + } else { + throw new Error('writeBinary called without a string/Buffer argument: ' + arg); + } +}; + +TBinaryProtocol.prototype.readMessageBegin = function() { + var sz = this.readI32(); + var type, name, seqid; + + if (sz < 0) { + var version = sz & VERSION_MASK; + if (version != VERSION_1) { + console.log("BAD: " + version); + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad version in readMessageBegin: " + sz); + } + type = sz & TYPE_MASK; + name = this.readString(); + seqid = this.readI32(); + } else { + if (this.strictRead) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "No protocol version header"); + } + name = this.trans.read(sz); + type = this.readByte(); + seqid = this.readI32(); + } + return {fname: name, mtype: type, rseqid: seqid}; +}; + +TBinaryProtocol.prototype.readMessageEnd = function() { +}; + +TBinaryProtocol.prototype.readStructBegin = function() { + return {fname: ''}; +}; + +TBinaryProtocol.prototype.readStructEnd = function() { +}; + +TBinaryProtocol.prototype.readFieldBegin = function() { + var type = this.readByte(); + if (type == Type.STOP) { + return {fname: null, ftype: type, fid: 0}; + } + var id = this.readI16(); + return {fname: null, ftype: type, fid: id}; +}; + +TBinaryProtocol.prototype.readFieldEnd = function() { +}; + +TBinaryProtocol.prototype.readMapBegin = function() { + var ktype = this.readByte(); + var vtype = this.readByte(); + var size = this.readI32(); + return {ktype: ktype, vtype: vtype, size: size}; +}; + +TBinaryProtocol.prototype.readMapEnd = function() { +}; + +TBinaryProtocol.prototype.readListBegin = function() { + var etype = this.readByte(); + var size = this.readI32(); + return {etype: etype, size: size}; +}; + +TBinaryProtocol.prototype.readListEnd = function() { +}; + +TBinaryProtocol.prototype.readSetBegin = function() { + var etype = this.readByte(); + var size = this.readI32(); + return {etype: etype, size: size}; +}; + +TBinaryProtocol.prototype.readSetEnd = function() { +}; + +TBinaryProtocol.prototype.readBool = function() { + var b = this.readByte(); + if (b === 0) { + return false; + } + return true; +}; + +TBinaryProtocol.prototype.readByte = function() { + return this.trans.readByte(); +}; + +TBinaryProtocol.prototype.readI16 = function() { + return this.trans.readI16(); +}; + +TBinaryProtocol.prototype.readI32 = function() { + return this.trans.readI32(); +}; + +TBinaryProtocol.prototype.readI64 = function() { + var buff = this.trans.read(8); + return new Int64(buff); +}; + +TBinaryProtocol.prototype.readDouble = function() { + return this.trans.readDouble(); +}; + +TBinaryProtocol.prototype.readBinary = function() { + var len = this.readI32(); + return this.trans.read(len); +}; + +TBinaryProtocol.prototype.readString = function() { + var len = this.readI32(); + return this.trans.readString(len); +}; + +TBinaryProtocol.prototype.getTransport = function() { + return this.trans; +}; + +TBinaryProtocol.prototype.skip = function(type) { + switch (type) { + case Type.STOP: + return; + case Type.BOOL: + this.readBool(); + break; + case Type.BYTE: + this.readByte(); + break; + case Type.I16: + this.readI16(); + break; + case Type.I32: + this.readI32(); + break; + case Type.I64: + this.readI64(); + break; + case Type.DOUBLE: + this.readDouble(); + break; + case Type.STRING: + this.readString(); + break; + case Type.STRUCT: + this.readStructBegin(); + while (true) { + var r = this.readFieldBegin(); + if (r.ftype === Type.STOP) { + break; + } + this.skip(r.ftype); + this.readFieldEnd(); + } + this.readStructEnd(); + break; + case Type.MAP: + var mapBegin = this.readMapBegin(); + for (var i = 0; i < mapBegin.size; ++i) { + this.skip(mapBegin.ktype); + this.skip(mapBegin.vtype); + } + this.readMapEnd(); + break; + case Type.SET: + var setBegin = this.readSetBegin(); + for (var i2 = 0; i2 < setBegin.size; ++i2) { + this.skip(setBegin.etype); + } + this.readSetEnd(); + break; + case Type.LIST: + var listBegin = this.readListBegin(); + for (var i3 = 0; i3 < listBegin.size; ++i3) { + this.skip(listBegin.etype); + } + this.readListEnd(); + break; + default: + throw new Error("Invalid type: " + type); + } +}; http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/buffered_transport.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/buffered_transport.js b/lib/nodejs/lib/thrift/buffered_transport.js new file mode 100644 index 0000000..13636b5 --- /dev/null +++ b/lib/nodejs/lib/thrift/buffered_transport.js @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var binary = require('./binary'); +var InputBufferUnderrunError = require('./input_buffer_underrun_error'); + +module.exports = TBufferedTransport; + +function TBufferedTransport(buffer, callback) { + this.defaultReadBufferSize = 1024; + this.writeBufferSize = 512; // Soft Limit + this.inBuf = new Buffer(this.defaultReadBufferSize); + this.readCursor = 0; + this.writeCursor = 0; // for input buffer + this.outBuffers = []; + this.outCount = 0; + this.onFlush = callback; +}; + +TBufferedTransport.receiver = function(callback, seqid) { + var reader = new TBufferedTransport(); + + return function(data) { + if (reader.writeCursor + data.length > reader.inBuf.length) { + var buf = new Buffer(reader.writeCursor + data.length); + reader.inBuf.copy(buf, 0, 0, reader.writeCursor); + reader.inBuf = buf; + } + data.copy(reader.inBuf, reader.writeCursor, 0); + reader.writeCursor += data.length; + + callback(reader, seqid); + }; +}; + + +TBufferedTransport.prototype.commitPosition = function(){ + var unreadSize = this.writeCursor - this.readCursor; + var bufSize = (unreadSize * 2 > this.defaultReadBufferSize) ? + unreadSize * 2 : this.defaultReadBufferSize; + var buf = new Buffer(bufSize); + if (unreadSize > 0) { + this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor); + } + this.readCursor = 0; + this.writeCursor = unreadSize; + this.inBuf = buf; +}; + +TBufferedTransport.prototype.rollbackPosition = function(){ + this.readCursor = 0; +} + + // TODO: Implement open/close support +TBufferedTransport.prototype.isOpen = function() { + return true; +}; + +TBufferedTransport.prototype.open = function() { +}; + +TBufferedTransport.prototype.close = function() { +}; + + // Set the seqid of the message in the client + // So that callbacks can be found +TBufferedTransport.prototype.setCurrSeqId = function(seqid) { + this._seqid = seqid; +}; + +TBufferedTransport.prototype.ensureAvailable = function(len) { + if (this.readCursor + len > this.writeCursor) { + throw new InputBufferUnderrunError(); + } +}; + +TBufferedTransport.prototype.read = function(len) { + this.ensureAvailable(len); + var buf = new Buffer(len); + this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len); + this.readCursor += len; + return buf; +}; + +TBufferedTransport.prototype.readByte = function() { + this.ensureAvailable(1); + return binary.readByte(this.inBuf[this.readCursor++]); +}; + +TBufferedTransport.prototype.readI16 = function() { + this.ensureAvailable(2); + var i16 = binary.readI16(this.inBuf, this.readCursor); + this.readCursor += 2; + return i16; +}; + +TBufferedTransport.prototype.readI32 = function() { + this.ensureAvailable(4); + var i32 = binary.readI32(this.inBuf, this.readCursor); + this.readCursor += 4; + return i32; +}; + +TBufferedTransport.prototype.readDouble = function() { + this.ensureAvailable(8); + var d = binary.readDouble(this.inBuf, this.readCursor); + this.readCursor += 8; + return d; +}; + +TBufferedTransport.prototype.readString = function(len) { + this.ensureAvailable(len); + var str = this.inBuf.toString('utf8', this.readCursor, this.readCursor + len); + this.readCursor += len; + return str; +}; + +TBufferedTransport.prototype.borrow = function() { + var obj = {buf: this.inBuf, readIndex: this.readCursor, writeIndex: this.writeCursor}; + return obj; +}; + +TBufferedTransport.prototype.consume = function(bytesConsumed) { + this.readCursor += bytesConsumed; +}; + +TBufferedTransport.prototype.write = function(buf) { + if (typeof(buf) === "string") { + buf = new Buffer(buf, 'utf8'); + } + this.outBuffers.push(buf); + this.outCount += buf.length; +}; + +TBufferedTransport.prototype.flush = function() { + // If the seqid of the callback is available pass it to the onFlush + // Then remove the current seqid + var seqid = this._seqid; + this._seqid = null; + + if (this.outCount < 1) { + return; + } + + var msg = new Buffer(this.outCount), + pos = 0; + this.outBuffers.forEach(function(buf) { + buf.copy(msg, pos, 0); + pos += buf.length; + }); + + if (this.onFlush) { + // Passing seqid through this call to get it to the connection + this.onFlush(msg, seqid); + } + + this.outBuffers = []; + this.outCount = 0; +} http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/compact_protocol.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/compact_protocol.js b/lib/nodejs/lib/thrift/compact_protocol.js new file mode 100644 index 0000000..45d62f4 --- /dev/null +++ b/lib/nodejs/lib/thrift/compact_protocol.js @@ -0,0 +1,907 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var log = require('./log'); +var Int64 = require('node-int64'); +var Thrift = require('./thrift'); +var Type = Thrift.Type; + +module.exports = TCompactProtocol; + +var POW_8 = Math.pow(2, 8); +var POW_24 = Math.pow(2, 24); +var POW_32 = Math.pow(2, 32); +var POW_40 = Math.pow(2, 40); +var POW_48 = Math.pow(2, 48); +var POW_52 = Math.pow(2, 52); +var POW_1022 = Math.pow(2, 1022); + +/** + * Constructor Function for the Compact Protocol. + * @constructor + * @param {object} [trans] - The underlying transport to read/write. + * @classdesc The Apache Thrift Protocol layer performs serialization + * of base types, the compact protocol serializes data in binary + * form with minimal space used for scalar values. + */ +function TCompactProtocol(trans) { + this.trans = trans; + this.lastField_ = []; + this.lastFieldId_ = 0; + this.string_limit_ = 0; + this.string_buf_ = null; + this.string_buf_size_ = 0; + this.container_limit_ = 0; + this.booleanField_ = { + name: null, + hasBoolValue: false + }; + this.boolValue_ = { + hasBoolValue: false, + boolValue: false + }; +}; + + +// +// Compact Protocol Constants +// + +/** + * Compact Protocol ID number. + * @readonly + * @const {number} PROTOCOL_ID + */ +TCompactProtocol.PROTOCOL_ID = -126; //1000 0010 + +/** + * Compact Protocol version number. + * @readonly + * @const {number} VERSION_N + */ +TCompactProtocol.VERSION_N = 1; + +/** + * Compact Protocol version mask for combining protocol version and message type in one byte. + * @readonly + * @const {number} VERSION_MASK + */ +TCompactProtocol.VERSION_MASK = 0x1f; //0001 1111 + +/** + * Compact Protocol message type mask for combining protocol version and message type in one byte. + * @readonly + * @const {number} TYPE_MASK + */ +TCompactProtocol.TYPE_MASK = -32; //1110 0000 + +/** + * Compact Protocol message type bits for ensuring message type bit size. + * @readonly + * @const {number} TYPE_BITS + */ +TCompactProtocol.TYPE_BITS = 7; //0000 0111 + +/** + * Compact Protocol message type shift amount for combining protocol version and message type in one byte. + * @readonly + * @const {number} TYPE_SHIFT_AMOUNT + */ +TCompactProtocol.TYPE_SHIFT_AMOUNT = 5; + +/** + * Compact Protocol type IDs used to keep type data within one nibble. + * @readonly + * @property {number} CT_STOP - End of a set of fields. + * @property {number} CT_BOOLEAN_TRUE - Flag for Boolean field with true value (packed field and value). + * @property {number} CT_BOOLEAN_FALSE - Flag for Boolean field with false value (packed field and value). + * @property {number} CT_BYTE - Signed 8 bit integer. + * @property {number} CT_I16 - Signed 16 bit integer. + * @property {number} CT_I32 - Signed 32 bit integer. + * @property {number} CT_I64 - Signed 64 bit integer (2^53 max in JavaScript). + * @property {number} CT_DOUBLE - 64 bit IEEE 854 floating point. + * @property {number} CT_BINARY - Array of bytes (used for strings also). + * @property {number} CT_LIST - A collection type (unordered). + * @property {number} CT_SET - A collection type (unordered and without repeated values). + * @property {number} CT_MAP - A collection type (map/associative-array/dictionary). + * @property {number} CT_STRUCT - A multifield type. + */ +TCompactProtocol.Types = { + CT_STOP: 0x00, + CT_BOOLEAN_TRUE: 0x01, + CT_BOOLEAN_FALSE: 0x02, + CT_BYTE: 0x03, + CT_I16: 0x04, + CT_I32: 0x05, + CT_I64: 0x06, + CT_DOUBLE: 0x07, + CT_BINARY: 0x08, + CT_LIST: 0x09, + CT_SET: 0x0A, + CT_MAP: 0x0B, + CT_STRUCT: 0x0C +}; + +/** + * Array mapping Compact type IDs to standard Thrift type IDs. + * @readonly + */ +TCompactProtocol.TTypeToCType = [ + TCompactProtocol.Types.CT_STOP, // T_STOP + 0, // unused + TCompactProtocol.Types.CT_BOOLEAN_TRUE, // T_BOOL + TCompactProtocol.Types.CT_BYTE, // T_BYTE + TCompactProtocol.Types.CT_DOUBLE, // T_DOUBLE + 0, // unused + TCompactProtocol.Types.CT_I16, // T_I16 + 0, // unused + TCompactProtocol.Types.CT_I32, // T_I32 + 0, // unused + TCompactProtocol.Types.CT_I64, // T_I64 + TCompactProtocol.Types.CT_BINARY, // T_STRING + TCompactProtocol.Types.CT_STRUCT, // T_STRUCT + TCompactProtocol.Types.CT_MAP, // T_MAP + TCompactProtocol.Types.CT_SET, // T_SET + TCompactProtocol.Types.CT_LIST, // T_LIST +]; + + +// +// Compact Protocol Utilities +// + +/** + * Returns the underlying transport layer. + * @return {object} The underlying transport layer. + */TCompactProtocol.prototype.getTransport = function() { + return this.trans; +}; + +/** + * Lookup a Compact Protocol Type value for a given Thrift Type value. + * N.B. Used only internally. + * @param {number} ttype - Thrift type value + * @returns {number} Compact protocol type value + */ +TCompactProtocol.prototype.getCompactType = function(ttype) { + return TCompactProtocol.TTypeToCType[ttype]; +}; + +/** + * Lookup a Thrift Type value for a given Compact Protocol Type value. + * N.B. Used only internally. + * @param {number} type - Compact Protocol type value + * @returns {number} Thrift Type value + */ +TCompactProtocol.prototype.getTType = function(type) { + switch (type) { + case Type.STOP: + return Type.STOP; + case TCompactProtocol.Types.CT_BOOLEAN_FALSE: + case TCompactProtocol.Types.CT_BOOLEAN_TRUE: + return Type.BOOL; + case TCompactProtocol.Types.CT_BYTE: + return Type.BYTE; + case TCompactProtocol.Types.CT_I16: + return Type.I16; + case TCompactProtocol.Types.CT_I32: + return Type.I32; + case TCompactProtocol.Types.CT_I64: + return Type.I64; + case TCompactProtocol.Types.CT_DOUBLE: + return Type.DOUBLE; + case TCompactProtocol.Types.CT_BINARY: + return Type.STRING; + case TCompactProtocol.Types.CT_LIST: + return Type.LIST; + case TCompactProtocol.Types.CT_SET: + return Type.SET; + case TCompactProtocol.Types.CT_MAP: + return Type.MAP; + case TCompactProtocol.Types.CT_STRUCT: + return Type.STRUCT; + default: + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Unknown type: " + type); + } + return Type.STOP; +}; + + +// +// Compact Protocol write operations +// + +/** + * Send any buffered bytes to the end point. + */ +TCompactProtocol.prototype.flush = function() { + return this.trans.flush(); +}; + +/** + * Writes an RPC message header + * @param {string} name - The method name for the message. + * @param {number} type - The type of message (CALL, REPLY, EXCEPTION, ONEWAY). + * @param {number} seqid - The call sequence number (if any). + */ +TCompactProtocol.prototype.writeMessageBegin = function(name, type, seqid) { + this.writeByte(TCompactProtocol.PROTOCOL_ID); + this.writeByte((TCompactProtocol.VERSION_N & TCompactProtocol.VERSION_MASK) | + ((type << TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_MASK)); + this.writeVarint32(seqid); + this.writeString(name); + + // Record client seqid to find callback again + if (this._seqid) { + // TODO better logging log warning + log.warning('SeqId already set', { 'name': name }); + } else { + this._seqid = seqid; + this.trans.setCurrSeqId(seqid); + } +}; + +TCompactProtocol.prototype.writeMessageEnd = function() { +}; + +TCompactProtocol.prototype.writeStructBegin = function(name) { + this.lastField_.push(this.lastFieldId_); + this.lastFieldId_ = 0; +}; + +TCompactProtocol.prototype.writeStructEnd = function() { + this.lastFieldId_ = this.lastField_.pop(); +}; + +/** + * Writes a struct field header + * @param {string} name - The field name (not written with the compact protocol). + * @param {number} type - The field data type (a normal Thrift field Type). + * @param {number} id - The IDL field Id. + */ +TCompactProtocol.prototype.writeFieldBegin = function(name, type, id) { + if (type != Type.BOOL) { + return this.writeFieldBeginInternal(name, type, id, -1); + } + + this.booleanField_.name = name; + this.booleanField_.fieldType = type; + this.booleanField_.fieldId = id; +}; + +TCompactProtocol.prototype.writeFieldEnd = function() { +}; + +TCompactProtocol.prototype.writeFieldStop = function() { + this.writeByte(TCompactProtocol.Types.CT_STOP); +}; + +/** + * Writes a map collection header + * @param {number} keyType - The Thrift type of the map keys. + * @param {number} valType - The Thrift type of the map values. + * @param {number} size - The number of k/v pairs in the map. + */ +TCompactProtocol.prototype.writeMapBegin = function(keyType, valType, size) { + if (size === 0) { + this.writeByte(0); + } else { + this.writeVarint32(size); + this.writeByte(this.getCompactType(keyType) << 4 | this.getCompactType(valType)); + } +}; + +TCompactProtocol.prototype.writeMapEnd = function() { +}; + +/** + * Writes a list collection header + * @param {number} elemType - The Thrift type of the list elements. + * @param {number} size - The number of elements in the list. + */ +TCompactProtocol.prototype.writeListBegin = function(elemType, size) { + this.writeCollectionBegin(elemType, size); +}; + +TCompactProtocol.prototype.writeListEnd = function() { +}; + +/** + * Writes a set collection header + * @param {number} elemType - The Thrift type of the set elements. + * @param {number} size - The number of elements in the set. + */ +TCompactProtocol.prototype.writeSetBegin = function(elemType, size) { + this.writeCollectionBegin(elemType, size); +}; + +TCompactProtocol.prototype.writeSetEnd = function() { +}; + +TCompactProtocol.prototype.writeBool = function(value) { + if (this.booleanField_.name !== null) { + // we haven't written the field header yet + this.writeFieldBeginInternal(this.booleanField_.name, + this.booleanField_.fieldType, + this.booleanField_.fieldId, + (value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE + : TCompactProtocol.Types.CT_BOOLEAN_FALSE)); + this.booleanField_.name = null; + } else { + // we're not part of a field, so just write the value + this.writeByte((value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE + : TCompactProtocol.Types.CT_BOOLEAN_FALSE)); + } +}; + +TCompactProtocol.prototype.writeByte = function(b) { + this.trans.write(new Buffer([b])); +}; + +TCompactProtocol.prototype.writeI16 = function(i16) { + this.writeVarint32(this.i32ToZigzag(i16)); +}; + +TCompactProtocol.prototype.writeI32 = function(i32) { + this.writeVarint32(this.i32ToZigzag(i32)); +}; + +TCompactProtocol.prototype.writeI64 = function(i64) { + this.writeVarint64(this.i64ToZigzag(i64)); +}; + +// Little-endian, unlike TBinaryProtocol +TCompactProtocol.prototype.writeDouble = function(v) { + var buff = new Buffer(8); + var m, e, c; + + buff[7] = (v < 0 ? 0x80 : 0x00); + + v = Math.abs(v); + if (v !== v) { + // NaN, use QNaN IEEE format + m = 2251799813685248; + e = 2047; + } else if (v === Infinity) { + m = 0; + e = 2047; + } else { + e = Math.floor(Math.log(v) / Math.LN2); + c = Math.pow(2, -e); + if (v * c < 1) { + e--; + c *= 2; + } + + if (e + 1023 >= 2047) + { + // Overflow + m = 0; + e = 2047; + } + else if (e + 1023 >= 1) + { + // Normalized - term order matters, as Math.pow(2, 52-e) and v*Math.pow(2, 52) can overflow + m = (v*c-1) * POW_52; + e += 1023; + } + else + { + // Denormalized - also catches the '0' case, somewhat by chance + m = (v * POW_1022) * POW_52; + e = 0; + } + } + + buff[6] = (e << 4) & 0xf0; + buff[7] |= (e >> 4) & 0x7f; + + buff[0] = m & 0xff; + m = Math.floor(m / POW_8); + buff[1] = m & 0xff; + m = Math.floor(m / POW_8); + buff[2] = m & 0xff; + m = Math.floor(m / POW_8); + buff[3] = m & 0xff; + m >>= 8; + buff[4] = m & 0xff; + m >>= 8; + buff[5] = m & 0xff; + m >>= 8; + buff[6] |= m & 0x0f; + + this.trans.write(buff); +}; + +TCompactProtocol.prototype.writeString = function(arg) { + this.writeBinary(arg); +}; + +TCompactProtocol.prototype.writeBinary = function(arg) { + if (typeof arg === 'string') { + this.writeVarint32(Buffer.byteLength(arg, 'utf8')) ; + this.trans.write(arg, 'utf8'); + } else if (arg instanceof Buffer) { + this.writeVarint32(arg.length); + this.trans.write(arg); + } else { + throw new Error('writeString/writeBinary called without a string/Buffer argument: ' + arg); + } +}; + + +// +// Compact Protocol internal write methods +// + +TCompactProtocol.prototype.writeFieldBeginInternal = function(name, + fieldType, + fieldId, + typeOverride) { + //If there's a type override, use that. + var typeToWrite = (typeOverride == -1 ? this.getCompactType(fieldType) : typeOverride); + //Check if we can delta encode the field id + if (fieldId > this.lastFieldId_ && fieldId - this.lastFieldId_ <= 15) { + //Include the type delta with the field ID + this.writeByte((fieldId - this.lastFieldId_) << 4 | typeToWrite); + } else { + //Write separate type and ID values + this.writeByte(typeToWrite); + this.writeI16(fieldId); + } + this.lastFieldId_ = fieldId; +}; + +TCompactProtocol.prototype.writeCollectionBegin = function(elemType, size) { + if (size <= 14) { + //Combine size and type in one byte if possible + this.writeByte(size << 4 | this.getCompactType(elemType)); + } else { + this.writeByte(0xf0 | this.getCompactType(elemType)); + this.writeVarint32(size); + } +}; + +/** + * Write an i32 as a varint. Results in 1-5 bytes on the wire. + */ +TCompactProtocol.prototype.writeVarint32 = function(n) { + var buf = new Buffer(5); + var wsize = 0; + while (true) { + if ((n & ~0x7F) === 0) { + buf[wsize++] = n; + break; + } else { + buf[wsize++] = ((n & 0x7F) | 0x80); + n = n >>> 7; + } + } + var wbuf = new Buffer(wsize); + buf.copy(wbuf,0,0,wsize); + this.trans.write(wbuf); +}; + +/** + * Write an i64 as a varint. Results in 1-10 bytes on the wire. + * N.B. node-int64 is always big endian + */ +TCompactProtocol.prototype.writeVarint64 = function(n) { + if (typeof n === "number"){ + n = new Int64(n); + } + if (! (n instanceof Int64)) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + n); + } + + var buf = new Buffer(10); + var wsize = 0; + var hi = n.buffer.readUInt32BE(0, true); + var lo = n.buffer.readUInt32BE(4, true); + var mask = 0; + while (true) { + if (((lo & ~0x7F) === 0) && (hi === 0)) { + buf[wsize++] = lo; + break; + } else { + buf[wsize++] = ((lo & 0x7F) | 0x80); + mask = hi << 25; + lo = lo >>> 7; + hi = hi >>> 7; + lo = lo | mask; + } + } + var wbuf = new Buffer(wsize); + buf.copy(wbuf,0,0,wsize); + this.trans.write(wbuf); +}; + +/** + * Convert l into a zigzag long. This allows negative numbers to be + * represented compactly as a varint. + */ +TCompactProtocol.prototype.i64ToZigzag = function(l) { + if (typeof l === 'string') { + l = new Int64(parseInt(l, 10)); + } else if (typeof l === 'number') { + l = new Int64(l); + } + if (! (l instanceof Int64)) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + l); + } + var hi = l.buffer.readUInt32BE(0, true); + var lo = l.buffer.readUInt32BE(4, true); + var sign = hi >>> 31; + hi = ((hi << 1) | (lo >>> 31)) ^ ((!!sign) ? 0xFFFFFFFF : 0); + lo = (lo << 1) ^ ((!!sign) ? 0xFFFFFFFF : 0); + return new Int64(hi, lo); +}; + +/** + * Convert n into a zigzag int. This allows negative numbers to be + * represented compactly as a varint. + */ +TCompactProtocol.prototype.i32ToZigzag = function(n) { + return (n << 1) ^ ((n & 0x80000000) ? 0xFFFFFFFF : 0); +}; + + +// +// Compact Protocol read operations +// + +TCompactProtocol.prototype.readMessageBegin = function() { + //Read protocol ID + var protocolId = this.trans.readByte(); + if (protocolId != TCompactProtocol.PROTOCOL_ID) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol identifier " + protocolId); + } + + //Read Version and Type + var versionAndType = this.trans.readByte(); + var version = (versionAndType & TCompactProtocol.VERSION_MASK); + if (version != TCompactProtocol.VERSION_N) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol version " + version); + } + var type = ((versionAndType >> TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_BITS); + + //Read SeqId + var seqid = this.readVarint32(); + + //Read name + var name = this.readString(); + + return {fname: name, mtype: type, rseqid: seqid}; +}; + +TCompactProtocol.prototype.readMessageEnd = function() { +}; + +TCompactProtocol.prototype.readStructBegin = function() { + this.lastField_.push(this.lastFieldId_); + this.lastFieldId_ = 0; + return {fname: ''}; +}; + +TCompactProtocol.prototype.readStructEnd = function() { + this.lastFieldId_ = this.lastField_.pop(); +}; + +TCompactProtocol.prototype.readFieldBegin = function() { + var fieldId = 0; + var b = this.trans.readByte(b); + var type = (b & 0x0f); + + if (type == TCompactProtocol.Types.CT_STOP) { + return {fname: null, ftype: Thrift.Type.STOP, fid: 0}; + } + + //Mask off the 4 MSB of the type header to check for field id delta. + var modifier = ((b & 0x000000f0) >>> 4); + if (modifier === 0) { + //If not a delta read the field id. + fieldId = this.readI16(); + } else { + //Recover the field id from the delta + fieldId = (this.lastFieldId_ + modifier); + } + var fieldType = this.getTType(type); + + //Boolean are encoded with the type + if (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE || + type == TCompactProtocol.Types.CT_BOOLEAN_FALSE) { + this.boolValue_.hasBoolValue = true; + this.boolValue_.boolValue = + (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE ? true : false); + } + + //Save the new field for the next delta computation. + this.lastFieldId_ = fieldId; + return {fname: null, ftype: fieldType, fid: fieldId}; +}; + +TCompactProtocol.prototype.readFieldEnd = function() { +}; + +TCompactProtocol.prototype.readMapBegin = function() { + var msize = this.readVarint32(); + if (msize < 0) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative map size"); + } + + var kvType = 0; + if (msize !== 0) { + kvType = this.trans.readByte(); + } + + var keyType = this.getTType((kvType & 0xf0) >>> 4); + var valType = this.getTType(kvType & 0xf); + return {ktype: keyType, vtype: valType, size: msize}; +}; + +TCompactProtocol.prototype.readMapEnd = function() { +}; + +TCompactProtocol.prototype.readListBegin = function() { + var size_and_type = this.trans.readByte(); + + var lsize = (size_and_type >>> 4) & 0x0000000f; + if (lsize == 15) { + lsize = this.readVarint32(); + } + + if (lsize < 0) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative list size"); + } + + var elemType = this.getTType(size_and_type & 0x0000000f); + + return {etype: elemType, size: lsize}; +}; + +TCompactProtocol.prototype.readListEnd = function() { +}; + +TCompactProtocol.prototype.readSetBegin = function() { + return this.readListBegin(); +}; + +TCompactProtocol.prototype.readSetEnd = function() { +}; + +TCompactProtocol.prototype.readBool = function() { + var value = false; + var rsize = 0; + if (this.boolValue_.hasBoolValue === true) { + value = this.boolValue_.boolValue; + this.boolValue_.hasBoolValue = false; + } else { + var res = this.trans.readByte(); + rsize = res.rsize; + value = (res.value == TCompactProtocol.Types.CT_BOOLEAN_TRUE); + } + return value; +}; + +TCompactProtocol.prototype.readByte = function() { + return this.trans.readByte(); +}; + +TCompactProtocol.prototype.readI16 = function() { + return this.readI32(); +}; + +TCompactProtocol.prototype.readI32 = function() { + return this.zigzagToI32(this.readVarint32()); +}; + +TCompactProtocol.prototype.readI64 = function() { + return this.zigzagToI64(this.readVarint64()); +}; + +// Little-endian, unlike TBinaryProtocol +TCompactProtocol.prototype.readDouble = function() { + var buff = this.trans.read(8); + var off = 0; + + var signed = buff[off + 7] & 0x80; + var e = (buff[off+6] & 0xF0) >> 4; + e += (buff[off+7] & 0x7F) << 4; + + var m = buff[off]; + m += buff[off+1] << 8; + m += buff[off+2] << 16; + m += buff[off+3] * POW_24; + m += buff[off+4] * POW_32; + m += buff[off+5] * POW_40; + m += (buff[off+6] & 0x0F) * POW_48; + + switch (e) { + case 0: + e = -1022; + break; + case 2047: + return m ? NaN : (signed ? -Infinity : Infinity); + default: + m += POW_52; + e -= 1023; + } + + if (signed) { + m *= -1; + } + + return m * Math.pow(2, e - 52); +}; + +TCompactProtocol.prototype.readBinary = function() { + var size = this.readVarint32(); + // Catch empty string case + if (size === 0) { + return ""; + } + + // Catch error cases + if (size < 0) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative binary/string size"); + } + var value = this.trans.readString(size); + + return value; +}; + +TCompactProtocol.prototype.readString = function() { + return this.readBinary(); +}; + + +// +// Compact Protocol internal read operations +// + +/** + * Read an i32 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 5 bytes. + */ +TCompactProtocol.prototype.readVarint32 = function() { + return this.readVarint64(); +}; + +/** + * Read an i64 from the wire as a proper varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 10 bytes. + */ +TCompactProtocol.prototype.readVarint64 = function() { + var rsize = 0; + var lo = 0; + var hi = 0; + var shift = 0; + while (true) { + var b = this.trans.readByte(); + rsize ++; + if (shift <= 25) { + lo = lo | ((b & 0x7f) << shift); + } else if (25 < shift && shift < 32) { + lo = lo | ((b & 0x7f) << shift); + hi = hi | ((b & 0x7f) >>> (32-shift)); + } else { + hi = hi | ((b & 0x7f) << (shift-32)); + } + shift += 7; + if (!(b & 0x80)) { + break; + } + if (rsize >= 10) { + throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Variable-length int over 10 bytes."); + } + } + var i64 = new Int64(hi, lo); + return i64.toNumber(); +}; + +/** + * Convert from zigzag int to int. + */ +TCompactProtocol.prototype.zigzagToI32 = function(n) { + return (n >>> 1) ^ (-1 * (n & 1)); +}; + +/** + * Convert from zigzag long to long. + */ +TCompactProtocol.prototype.zigzagToI64 = function(n) { + var zz = new Int64(n); + var hi = zz.buffer.readUInt32BE(0, true); + var lo = zz.buffer.readUInt32BE(4, true); + + var neg = new Int64(hi & 0, lo & 1); + neg._2scomp(); + var hi_neg = neg.buffer.readUInt32BE(0, true); + var lo_neg = neg.buffer.readUInt32BE(4, true); + + var hi_lo = (hi << 31); + hi = (hi >>> 1) ^ (hi_neg); + lo = ((lo >>> 1) | hi_lo) ^ (lo_neg); + var i64 = new Int64(hi, lo); + return i64.toNumber(); +}; + +TCompactProtocol.prototype.skip = function(type) { + switch (type) { + case Type.STOP: + return; + case Type.BOOL: + this.readBool(); + break; + case Type.BYTE: + this.readByte(); + break; + case Type.I16: + this.readI16(); + break; + case Type.I32: + this.readI32(); + break; + case Type.I64: + this.readI64(); + break; + case Type.DOUBLE: + this.readDouble(); + break; + case Type.STRING: + this.readString(); + break; + case Type.STRUCT: + this.readStructBegin(); + while (true) { + var r = this.readFieldBegin(); + if (r.ftype === Type.STOP) { + break; + } + this.skip(r.ftype); + this.readFieldEnd(); + } + this.readStructEnd(); + break; + case Type.MAP: + var mapBegin = this.readMapBegin(); + for (var i = 0; i < mapBegin.size; ++i) { + this.skip(mapBegin.ktype); + this.skip(mapBegin.vtype); + } + this.readMapEnd(); + break; + case Type.SET: + var setBegin = this.readSetBegin(); + for (var i2 = 0; i2 < setBegin.size; ++i2) { + this.skip(setBegin.etype); + } + this.readSetEnd(); + break; + case Type.LIST: + var listBegin = this.readListBegin(); + for (var i3 = 0; i3 < listBegin.size; ++i3) { + this.skip(listBegin.etype); + } + this.readListEnd(); + break; + default: + throw new Error("Invalid type: " + type); + } +}; http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/connection.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js index aa985df..e836e30 100644 --- a/lib/nodejs/lib/thrift/connection.js +++ b/lib/nodejs/lib/thrift/connection.js @@ -16,43 +16,47 @@ * specific language governing permissions and limitations * under the License. */ -var util = require('util'), - EventEmitter = require("events").EventEmitter, - net = require('net'), - tls = require('tls'), - ttransport = require('./transport'), - tprotocol = require('./protocol'), - thrift = require('./thrift'); +var util = require('util'); +var EventEmitter = require("events").EventEmitter; +var net = require('net'); +var tls = require('tls'); +var thrift = require('./thrift'); + +var TBufferedTransport = require('./buffered_transport'); +var TBinaryProtocol = require('./binary_protocol'); +var InputBufferUnderrunError = require('./input_buffer_underrun_error'); + +var createClient = require('./create_client'); var binary = require('./binary'); var Connection = exports.Connection = function(stream, options) { var self = this; EventEmitter.call(this); - + this.seqId2Service = {}; this.connection = stream; this.options = options || {}; - this.transport = this.options.transport || ttransport.TBufferedTransport; - this.protocol = this.options.protocol || tprotocol.TBinaryProtocol; + this.transport = this.options.transport || TBufferedTransport; + this.protocol = this.options.protocol || TBinaryProtocol; this.offline_queue = []; this.connected = false; this._debug = this.options.debug || false; - if (this.options.max_attempts && - !isNaN(this.options.max_attempts) && + if (this.options.max_attempts && + !isNaN(this.options.max_attempts) && this.options.max_attempts > 0) { this.max_attempts = +this.options.max_attempts; } this.retry_max_delay = null; - if (this.options.retry_max_delay !== undefined && - !isNaN(this.options.retry_max_delay) && + if (this.options.retry_max_delay !== undefined && + !isNaN(this.options.retry_max_delay) && this.options.retry_max_delay > 0) { this.retry_max_delay = this.options.retry_max_delay; } this.connect_timeout = false; - if (this.options.connect_timeout && - !isNaN(this.options.connect_timeout) && + if (this.options.connect_timeout && + !isNaN(this.options.connect_timeout) && this.options.connect_timeout > 0) { this.connect_timeout = +this.options.connect_timeout; } @@ -94,7 +98,7 @@ var Connection = exports.Connection = function(stream, options) { this.connection.addListener("error", function(err) { // Only emit the error if no-one else is listening on the connection // or if someone is listening on us - if (self.connection.listeners('error').length === 1 || + if (self.connection.listeners('error').length === 1 || self.listeners('error').length > 0) { self.emit("error", err); } @@ -123,12 +127,12 @@ var Connection = exports.Connection = function(stream, options) { // in seqId2Service. If the SeqId is found in the hash we need to // lookup the appropriate client for this call. // The connection.client object is a single client object when not - // multiplexing, when using multiplexing it is a service name keyed + // multiplexing, when using multiplexing it is a service name keyed // hash of client objects. //NOTE: The 2 way interdependencies between protocols, transports, // connections and clients in the Node.js implementation are irregular - // and make the implementation difficult to extend and maintain. We - // should bring this stuff inline with typical thrift I/O stack + // and make the implementation difficult to extend and maintain. We + // should bring this stuff inline with typical thrift I/O stack // operation soon. // --ra var service_name = self.seqId2Service[header.rseqid]; @@ -137,7 +141,7 @@ var Connection = exports.Connection = function(stream, options) { delete self.seqId2Service[header.rseqid]; } /*jshint -W083 */ - client._reqs[dummy_seqid] = function(err, success){ + client._reqs[dummy_seqid] = function(err, success){ transport_with_data.commitPosition(); var callback = client._reqs[header.rseqid]; @@ -152,14 +156,14 @@ var Connection = exports.Connection = function(stream, options) { client['recv_' + header.fname](message, header.mtype, dummy_seqid); } else { delete client._reqs[dummy_seqid]; - self.emit("error", + self.emit("error", new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME, "Received a response to an unknown RPC function")); } } } catch (e) { - if (e instanceof ttransport.InputBufferUnderrunError) { + if (e instanceof InputBufferUnderrunError) { transport_with_data.rollbackPosition(); } else { @@ -266,19 +270,7 @@ exports.createSSLConnection = function(host, port, options) { }; -exports.createClient = function(cls, connection) { - if (cls.Client) { - cls = cls.Client; - } - var client = new cls(new connection.transport(undefined, function(buf) { - connection.write(buf); - }), connection.protocol); - - // TODO clean this up - connection.client = client; - - return client; -}; +exports.createClient = createClient; var child_process = require('child_process'); var StdIOConnection = exports.StdIOConnection = function(command, options) { @@ -293,8 +285,8 @@ var StdIOConnection = exports.StdIOConnection = function(command, options) { this._debug = options.debug || false; this.connection = child.stdin; this.options = options || {}; - this.transport = this.options.transport || ttransport.TBufferedTransport; - this.protocol = this.options.protocol || tprotocol.TBinaryProtocol; + this.transport = this.options.transport || TBufferedTransport; + this.protocol = this.options.protocol || TBinaryProtocol; this.offline_queue = []; if(this._debug === true){ @@ -344,7 +336,7 @@ var StdIOConnection = exports.StdIOConnection = function(command, options) { client['recv_' + header.fname](message, header.mtype, dummy_seqid); } catch (e) { - if (e instanceof ttransport.InputBufferUnderrunError) { + if (e instanceof InputBufferUnderrunError) { transport_with_data.rollbackPosition(); } else { @@ -372,17 +364,4 @@ exports.createStdIOConnection = function(command,options){ return new StdIOConnection(command,options); }; -exports.createStdIOClient = function(cls,connection) { - if (cls.Client) { - cls = cls.Client; - } - - var client = new cls(new connection.transport(undefined, function(buf) { - connection.write(buf); - }), connection.protocol); - - // TODO clean this up - connection.client = client; - - return client; -}; +exports.createStdIOClient = createClient; http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/create_client.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/create_client.js b/lib/nodejs/lib/thrift/create_client.js new file mode 100644 index 0000000..d6b77a8 --- /dev/null +++ b/lib/nodejs/lib/thrift/create_client.js @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +module.exports = createClient; + +/** + * Creates a new client object for the specified Thrift service. + * @param {object} ServiceClient - The module containing the generated service client + * @param {Connection} Connection - The connection to use. + * @returns {object} The client object. + */ +function createClient(ServiceClient, connection) { + // TODO validate required options and throw otherwise + if (ServiceClient.Client) { + ServiceClient = ServiceClient.Client; + } + // TODO detangle these initialization calls + // creating "client" requires + // - new service client instance + // + // New service client instance requires + // - new transport instance + // - protocol class reference + // + // New transport instance requires + // - Buffer to use (or none) + // - Callback to call on flush + + // Wrap the write method + var writeCb = function(buf, seqid) { + connection.write(buf, seqid); + }; + var transport = new connection.transport(undefined, writeCb); + var client = new ServiceClient(transport, connection.protocol); + transport.client = client; + connection.client = client; + return client; +}; http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/framed_transport.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/framed_transport.js b/lib/nodejs/lib/thrift/framed_transport.js new file mode 100644 index 0000000..6947925 --- /dev/null +++ b/lib/nodejs/lib/thrift/framed_transport.js @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var binary = require('./binary'); +var InputBufferUnderrunError = require('./input_buffer_underrun_error'); + +module.exports = TFramedTransport; + +function TFramedTransport(buffer, callback) { + this.inBuf = buffer || new Buffer(0); + this.outBuffers = []; + this.outCount = 0; + this.readPos = 0; + this.onFlush = callback; +}; + +TFramedTransport.receiver = function(callback, seqid) { + var residual = null; + + return function(data) { + // Prepend any residual data from our previous read + if (residual) { + data = Buffer.concat([residual, data]); + residual = null; + } + + // framed transport + while (data.length) { + if (data.length < 4) { + // Not enough bytes to continue, save and resume on next packet + residual = data; + return; + } + var frameSize = binary.readI32(data, 0); + if (data.length < 4 + frameSize) { + // Not enough bytes to continue, save and resume on next packet + residual = data; + return; + } + + var frame = data.slice(4, 4 + frameSize); + residual = data.slice(4 + frameSize); + + callback(new TFramedTransport(frame), seqid); + + data = residual; + residual = null; + } + }; +}; + +TFramedTransport.prototype.commitPosition = function(){}, +TFramedTransport.prototype.rollbackPosition = function(){}, + + // TODO: Implement open/close support +TFramedTransport.prototype.isOpen = function() { + return true; +}; +TFramedTransport.prototype.open = function() {}; +TFramedTransport.prototype.close = function() {}; + + // Set the seqid of the message in the client + // So that callbacks can be found +TFramedTransport.prototype.setCurrSeqId = function(seqid) { + this._seqid = seqid; +}; + +TFramedTransport.prototype.ensureAvailable = function(len) { + if (this.readPos + len > this.inBuf.length) { + throw new InputBufferUnderrunError(); + } +}; + +TFramedTransport.prototype.read = function(len) { // this function will be used for each frames. + this.ensureAvailable(len); + var end = this.readPos + len; + + if (this.inBuf.length < end) { + throw new Error('read(' + len + ') failed - not enough data'); + } + + var buf = this.inBuf.slice(this.readPos, end); + this.readPos = end; + return buf; +}; + +TFramedTransport.prototype.readByte = function() { + this.ensureAvailable(1); + return binary.readByte(this.inBuf[this.readPos++]); +}; + +TFramedTransport.prototype.readI16 = function() { + this.ensureAvailable(2); + var i16 = binary.readI16(this.inBuf, this.readPos); + this.readPos += 2; + return i16; +}; + +TFramedTransport.prototype.readI32 = function() { + this.ensureAvailable(4); + var i32 = binary.readI32(this.inBuf, this.readPos); + this.readPos += 4; + return i32; +}; + +TFramedTransport.prototype.readDouble = function() { + this.ensureAvailable(8); + var d = binary.readDouble(this.inBuf, this.readPos); + this.readPos += 8; + return d; +}; + +TFramedTransport.prototype.readString = function(len) { + this.ensureAvailable(len); + var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len); + this.readPos += len; + return str; +}; + +TFramedTransport.prototype.borrow = function() { + return { + buf: this.inBuf, + readIndex: this.readPos, + writeIndex: this.inBuf.length + }; +}; + +TFramedTransport.prototype.consume = function(bytesConsumed) { + this.readPos += bytesConsumed; +}; + +TFramedTransport.prototype.write = function(buf, encoding) { + if (typeof(buf) === "string") { + buf = new Buffer(buf, encoding || 'utf8'); + } + this.outBuffers.push(buf); + this.outCount += buf.length; +}; + +TFramedTransport.prototype.flush = function() { + // If the seqid of the callback is available pass it to the onFlush + // Then remove the current seqid + var seqid = this._seqid; + this._seqid = null; + + var out = new Buffer(this.outCount), + pos = 0; + this.outBuffers.forEach(function(buf) { + buf.copy(out, pos, 0); + pos += buf.length; + }); + + if (this.onFlush) { + // TODO: optimize this better, allocate one buffer instead of both: + var msg = new Buffer(out.length + 4); + binary.writeI32(msg, out.length); + out.copy(msg, 4, 0, out.length); + if (this.onFlush) { + // Passing seqid through this call to get it to the connection + this.onFlush(msg, seqid); + } + } + + this.outBuffers = []; + this.outCount = 0; +}; http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/http_connection.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/http_connection.js b/lib/nodejs/lib/thrift/http_connection.js index b7659bc..f3fcd74 100644 --- a/lib/nodejs/lib/thrift/http_connection.js +++ b/lib/nodejs/lib/thrift/http_connection.js @@ -21,8 +21,12 @@ var http = require('http'); var https = require('https'); var EventEmitter = require('events').EventEmitter; var thrift = require('./thrift'); -var ttransport = require('./transport'); -var tprotocol = require('./protocol'); + +var TBufferedTransport = require('./buffered_transport'); +var TBinaryProtocol = require('./binary_protocol'); +var InputBufferUnderrunError = require('./input_buffer_underrun_error'); + +var createClient = require('./create_client'); /** * @class @@ -30,14 +34,14 @@ var tprotocol = require('./protocol'); * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc). * @property {string} protocol - The Thrift serialization protocol to use (TBinaryProtocol, etc.). * @property {string} path - The URL path to POST to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.). - * @property {object} headers - A standard Node.js header hash, an object hash containing key/value + * @property {object} headers - A standard Node.js header hash, an object hash containing key/value * pairs where the key is the header name string and the value is the header value string. * @property {boolean} https - True causes the connection to use https, otherwise http is used. * @property {object} nodeOptions - Options passed on to node. * @example * //Use a connection that requires ssl/tls, closes the connection after each request, * // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic - * // to https://thrift.example.com:9090/hello + * // to https://thrift.example.com:9090/hello * var thrift = require('thrift'); * var options = { * transport: thrift.TBufferedTransport, @@ -52,18 +56,18 @@ var tprotocol = require('./protocol'); */ /** - * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than + * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than * instantiating directly). * @constructor * @param {string} host - The host name or IP to connect to. * @param {number} port - The TCP port to connect to. * @param {ConnectOptions} options - The configuration options to use. - * @throws {error} Exceptions other than ttransport.InputBufferUnderrunError are rethrown + * @throws {error} Exceptions other than InputBufferUnderrunError are rethrown * @event {error} The "error" event is fired when a Node.js error event occurs during * request or response processing, in which case the node error is passed on. An "error" * event may also be fired when the connection can not map a response back to the * appropriate client (an internal error), generating a TApplicationException. - * @classdesc HttpConnection objects provide Thrift end point transport + * @classdesc HttpConnection objects provide Thrift end point transport * semantics implemented over the Node.js http.request() method. * @see {@link createHttpConnection} */ @@ -77,8 +81,8 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { this.host = host; this.port = port; this.https = this.options.https || false; - this.transport = this.options.transport || ttransport.TBufferedTransport; - this.protocol = this.options.protocol || tprotocol.TBinaryProtocol; + this.transport = this.options.transport || TBufferedTransport; + this.protocol = this.options.protocol || TBinaryProtocol; //Prepare Node.js options this.nodeOptions = { @@ -89,8 +93,8 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { headers: this.options.headers || {}, responseType: this.options.responseType || null }; - for (var attrname in this.options.nodeOptions) { - this.nodeOptions[attrname] = this.options.nodeOptions[attrname]; + for (var attrname in this.options.nodeOptions) { + this.nodeOptions[attrname] = this.options.nodeOptions[attrname]; } /*jshint -W069 */ if (! this.nodeOptions.headers['Connection']) { @@ -98,7 +102,7 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { } /*jshint +W069 */ - //The sequence map is used to map seqIDs back to the + //The sequence map is used to map seqIDs back to the // calling client in multiplexed scenarios this.seqId2Service = {}; @@ -112,13 +116,13 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { //The Multiplexed Protocol stores a hash of seqid to service names // in seqId2Service. If the SeqId is found in the hash we need to // lookup the appropriate client for this call. - // The client var is a single client object when not multiplexing, + // The client var is a single client object when not multiplexing, // when using multiplexing it is a service name keyed hash of client // objects. //NOTE: The 2 way interdependencies between protocols, transports, // connections and clients in the Node.js implementation are irregular - // and make the implementation difficult to extend and maintain. We - // should bring this stuff inline with typical thrift I/O stack + // and make the implementation difficult to extend and maintain. We + // should bring this stuff inline with typical thrift I/O stack // operation soon. // --ra var service_name = self.seqId2Service[header.rseqid]; @@ -127,7 +131,7 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { delete self.seqId2Service[header.rseqid]; } /*jshint -W083 */ - client._reqs[dummy_seqid] = function(err, success){ + client._reqs[dummy_seqid] = function(err, success){ transport_with_data.commitPosition(); var clientCallback = client._reqs[header.rseqid]; delete client._reqs[header.rseqid]; @@ -150,15 +154,15 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { } } catch (e) { - if (e instanceof ttransport.InputBufferUnderrunError) { + if (e instanceof InputBufferUnderrunError) { transport_with_data.rollbackPosition(); } else { throw e; } } } - - + + //Response handler ////////////////////////////////////////////////// this.responseCallback = function(response) { @@ -180,14 +184,14 @@ var HttpConnection = exports.HttpConnection = function(host, port, options) { } else { data.push(chunk); } - dataLen += chunk.length; + dataLen += chunk.length; }); response.on('end', function(){ - var buf = new Buffer(dataLen); - for (var i=0, len=data.length, pos=0; i<len; i++) { - data[i].copy(buf, pos); - pos += data[i].length; + var buf = new Buffer(dataLen); + for (var i=0, len=data.length, pos=0; i<len; i++) { + data[i].copy(buf, pos); + pos += data[i].length; } //Get the receiver function for the transport and // call it with the buffer @@ -201,8 +205,8 @@ util.inherits(HttpConnection, EventEmitter); * Writes Thrift message data to the connection * @param {Buffer} data - A Node.js Buffer containing the data to write * @returns {void} No return value. - * @event {error} the "error" event is raised upon request failure passing the - * Node.js error object to the listener. + * @event {error} the "error" event is raised upon request failure passing the + * Node.js error object to the listener. */ HttpConnection.prototype.write = function(data) { var self = this; @@ -212,7 +216,7 @@ HttpConnection.prototype.write = function(data) { http.request(self.nodeOptions, self.responseCallback); req.on('error', function(err) { self.emit("error", err); - }); + }); req.write(data); req.end(); }; @@ -230,20 +234,5 @@ exports.createHttpConnection = function(host, port, options) { return new HttpConnection(host, port, options); }; -/** - * Creates a new client object for the specified Thrift service. - * @param {object} cls - The module containing the service client - * @param {HttpConnection} httpConnection - The connection to use. - * @returns {object} The client object. - * @see {@link createHttpConnection} - */ -exports.createHttpClient = function(cls, httpConnection) { - if (cls.Client) { - cls = cls.Client; - } - httpConnection.client = - new cls(new httpConnection.transport(undefined, function(buf) {httpConnection.write(buf);}), - httpConnection.protocol); - return httpConnection.client; -}; +exports.createHttpClient = createClient http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/index.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js index 9b53dd0..e313dbb 100644 --- a/lib/nodejs/lib/thrift/index.js +++ b/lib/nodejs/lib/thrift/index.js @@ -55,8 +55,8 @@ exports.MultiplexedProcessor = mprocessor.MultiplexedProcessor; * Export transport and protocol so they can be used outside of a * cassandra/server context */ -exports.TFramedTransport = require('./transport').TFramedTransport; -exports.TBufferedTransport = require('./transport').TBufferedTransport; -exports.TBinaryProtocol = require('./protocol').TBinaryProtocol; -exports.TJSONProtocol = require('./protocol').TJSONProtocol; -exports.TCompactProtocol = require('./protocol').TCompactProtocol; +exports.TFramedTransport = require('./framed_transport'); +exports.TBufferedTransport = require('./buffered_transport'); +exports.TBinaryProtocol = require('./binary_protocol'); +exports.TJSONProtocol = require('./json_protocol'); +exports.TCompactProtocol = require('./compact_protocol'); http://git-wip-us.apache.org/repos/asf/thrift/blob/96f4f07b/lib/nodejs/lib/thrift/input_buffer_underrun_error.js ---------------------------------------------------------------------- diff --git a/lib/nodejs/lib/thrift/input_buffer_underrun_error.js b/lib/nodejs/lib/thrift/input_buffer_underrun_error.js new file mode 100644 index 0000000..4d4237b --- /dev/null +++ b/lib/nodejs/lib/thrift/input_buffer_underrun_error.js @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +var util = require("util"); + +module.exports = InputBufferUnderrunError; + +function InputBufferUnderrunError(message) { + Error.call(this, message); +}; + +util.inherits(InputBufferUnderrunError, Error);
