http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/abd646b2/proton-c/bindings/javascript/message.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/message.js b/proton-c/bindings/javascript/message.js new file mode 100644 index 0000000..564cc6e --- /dev/null +++ b/proton-c/bindings/javascript/message.js @@ -0,0 +1,848 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/*****************************************************************************/ +/* */ +/* Message */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a proton.Message instance. + * @classdesc This class is a mutable holder of message content that may be used + * to generate and encode or decode and access AMQP formatted message data. + * @constructor proton.Message + * @property {object} instructions delivery instructions for the message. + * @property {object} annotations infrastructure defined message annotations. + * @property {object} properties application defined message properties. + * @property {object} body message body as a native JavaScript Object. + * @property {object} data message body as a proton.Data Object. + */ +Module['Message'] = function() { // Message Constructor. + this._message = _pn_message(); + this._id = new Data(_pn_message_id(this._message)); + this._correlationId = new Data(_pn_message_correlation_id(this._message)); + + // ************************* Public properties **************************** + + this['instructions'] = null; + this['annotations'] = null; + + // Intitialise with an empty Object so we can set properties in a natural way. + // message.properties.prop1 = "foo"; + // message.properties.prop2 = "bar"; + this['properties'] = {}; + + this['body'] = null; + this['data'] = null; +}; + +// Expose constructor as package scope variable to make internal calls less verbose. +var Message = Module['Message']; + +// Expose prototype as a variable to make method declarations less verbose. +var _Message_ = Message.prototype; + +// ************************** Class properties ******************************** + +Message['DEFAULT_PRIORITY'] = 4; /** Default priority for messages.*/ + +// ************************* Protected methods ******************************** + +// We use the dot notation rather than associative array form for protected +// methods so they are visible to this "package", but the Closure compiler will +// minify and obfuscate names, effectively making a defacto "protected" method. + +/** + * This helper method checks the supplied error code, converts it into an + * exception and throws the exception. This method will try to use the message + * populated in pn_message_error(), if present, but if not it will fall + * back to using the basic error code rendering from pn_code(). + * @param code the error code to check. + */ +_Message_._check = function(code) { + if (code < 0) { + var errno = this['getErrno'](); + var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); + + throw new Module['MessageError'](message); + } else { + return code; + } +}; + +/** + * Encode the Message prior to sending on the wire. + */ +_Message_._preEncode = function() { + // A Message Object may be reused so we create new Data instances and clear + // the state for them each time put() gets called. + var inst = new Data(_pn_message_instructions(this._message)); + var ann = new Data(_pn_message_annotations(this._message)); + var props = new Data(_pn_message_properties(this._message)); + var body = new Data(_pn_message_body(this._message)); + + inst.clear(); + if (this['instructions']) { + inst['putObject'](this['instructions']); + } + + ann.clear(); + if (this['annotations']) { + ann['putObject'](this['annotations']); + } + + props.clear(); + if (this['properties']) { + props['putObject'](this['properties']); + } + + body.clear(); + if (this['body']) { + var contentType = this['getContentType'](); + if (contentType) { + var value = this['body']; + if (contentType === 'application/json' && JSON) { // Optionally encode body as JSON. + var json = JSON.stringify(value); + value = new Data['Binary'](json); + } else if (!(value instanceof Data['Binary'])) { // Construct a Binary from the body + value = new Data['Binary'](value); + } + // As content-type is set we send as an opaque AMQP data section. + this['setInferred'](true); + body['putBINARY'](value); + } else { // By default encode body using the native AMQP type system. + this['setInferred'](false); + body['putObject'](this['body']); + } + } +}; + +/** + * Decode the Message after receiving off the wire. + * @param {boolean} decodeBinaryAsString if set decode any AMQP Binary payload + * objects as strings. This can be useful as the data in Binary objects + * will be overwritten with subsequent calls to get, so they must be + * explicitly copied. Needless to say it is only safe to set this flag if + * you know that the data you are dealing with is actually a string, for + * example C/C++ applications often seem to encode strings as AMQP binary, + * a common cause of interoperability problems. + */ +_Message_._postDecode = function(decodeBinaryAsString) { + var inst = new Data(_pn_message_instructions(this._message)); + var ann = new Data(_pn_message_annotations(this._message)); + var props = new Data(_pn_message_properties(this._message)); + var body = new Data(_pn_message_body(this._message), decodeBinaryAsString); + + if (inst.next()) { + this['instructions'] = inst['getObject'](); + } else { + this['instructions'] = {}; + } + + if (ann.next()) { + this['annotations'] = ann['getObject'](); + } else { + this['annotations'] = {}; + } + + if (props.next()) { + this['properties'] = props['getObject'](); + } else { + this['properties'] = {}; + } + + if (body.next()) { + this['data'] = body; + this['body'] = body['getObject'](); + var contentType = this['getContentType'](); + if (contentType) { + if (contentType === 'application/json' && JSON) { + var json = this['body'].toString(); // Convert Binary to String. + this['body'] = JSON.parse(json); + } else if (contentType.indexOf('text/') === 0) { // It's a text/* MIME type + this['body'] = this['body'].toString(); // Convert Binary to String. + } + } + } else { + this['data'] = null; + this['body'] = null; + } +}; + +// *************************** Public methods ********************************* + +/** + * Free the Message. + * <p> + * N.B. This method has to be called explicitly in JavaScript as we can't + * intercept finalisers, so we need to remember to free before removing refs. + * @method free + * @memberof! proton.Message# + */ +_Message_['free'] = function() { + _pn_message_free(this._message); +}; + +/** + * @method getErrno + * @memberof! proton.Message# + * @returns {number the most recent error message code. + */ +_Message_['getErrno'] = function() { + return _pn_message_errno(this._message); +}; + +/** + * @method getError + * @memberof! proton.Message# + * @returns {string} the most recent error message as a String. + */ +_Message_['getError'] = function() { + return Pointer_stringify(_pn_error_text(_pn_message_error(this._message))); +}; + +/** + * Clears the contents of the Message. All fields will be reset to their default values. + * @method clear + * @memberof! proton.Message# + */ +_Message_['clear'] = function() { + _pn_message_clear(this._message); + this['instructions'] = null; + this['annotations'] = null; + this['properties'] = {}; + this['body'] = null; + this['data'] = null; +}; + +/** + * Get the inferred flag for a message. + * <p> + * The inferred flag for a message indicates how the message content + * is encoded into AMQP sections. If inferred is true then binary and + * list values in the body of the message will be encoded as AMQP DATA + * and AMQP SEQUENCE sections, respectively. If inferred is false, + * then all values in the body of the message will be encoded as AMQP + * VALUE sections regardless of their type. Use + * {@link proton.Message.setInferred} to set the value. + * @method isInferred + * @memberof! proton.Message# + * @returns {boolean} true iff the inferred flag for the message is set. + */ +_Message_['isInferred'] = function() { + return (_pn_message_is_inferred(this._message) > 0); +}; + +/** + * Set the inferred flag for a message. See {@link proton.Message.isInferred} + * for a description of what the inferred flag is. + * @method setInferred + * @memberof! proton.Message# + * @param {boolean} inferred the new value of the inferred flag. + */ +_Message_['setInferred'] = function(inferred) { + this._check(_pn_message_set_inferred(this._message, inferred)); +}; + +/** + * Get the durable flag for a message. + * <p> + * The durable flag indicates that any parties taking responsibility + * for the message must durably store the content. Use + * {@link proton.Message.setDurable} to set the value. + * @method isDurable + * @memberof! proton.Message# + * @returns {boolean} true iff the durable flag for the message is set. + */ +_Message_['isDurable'] = function() { + return (_pn_message_is_durable(this._message) > 0); +}; + +/** + * Set the durable flag for a message. See {@link proton.Message.isDurable} + * for a description of what the durable flag is. + * @method setDurable + * @memberof! proton.Message# + * @param {boolean} durable the new value of the durable flag. + */ +_Message_['setDurable'] = function(durable) { + this._check(_pn_message_set_durable(this._message, durable)); +}; + +/** + * Get the priority for a message. + * <p> + * The priority of a message impacts ordering guarantees. Within a + * given ordered context, higher priority messages may jump ahead of + * lower priority messages. Priority range is 0..255 + * @method getPriority + * @memberof! proton.Message# + * @returns {number} the priority of the Message. + */ +_Message_['getPriority'] = function() { + return _pn_message_get_priority(this._message) & 0xFF; // & 0xFF converts to unsigned. +}; + +/** + * Set the priority of the Message. See {@link proton.Message.getPriority} + * for details on message priority. + * @method setPriority + * @memberof! proton.Message# + * @param {number} priority the address we want to send the Message to. + */ +_Message_['setPriority'] = function(priority) { + this._check(_pn_message_set_priority(this._message, priority)); +}; + +/** + * Get the ttl for a message. + * <p> + * The ttl for a message determines how long a message is considered + * live. When a message is held for retransmit, the ttl is + * decremented. Once the ttl reaches zero, the message is considered + * dead. Once a message is considered dead it may be dropped. Use + * {@link proton.Message.setTTL} to set the ttl for a message. + * @method getTTL + * @memberof! proton.Message# + * @returns {number} the ttl in milliseconds. + */ +_Message_['getTTL'] = function() { + return _pn_message_get_ttl(this._message); +}; + +/** + * Set the ttl for a message. See {@link proton.Message.getTTL} + * for a detailed description of message ttl. + * @method setTTL + * @memberof! proton.Message# + * @param {number} ttl the new value for the message ttl in milliseconds. + */ +_Message_['setTTL'] = function(ttl) { + this._check(_pn_message_set_ttl(this._message, ttl)); +}; + +/** + * Get the first acquirer flag for a message. + * <p> + * When set to true, the first acquirer flag for a message indicates + * that the recipient of the message is the first recipient to acquire + * the message, i.e. there have been no failed delivery attempts to + * other acquirers. Note that this does not mean the message has not + * been delivered to, but not acquired, by other recipients. + * @method isFirstAcquirer + * @memberof! proton.Message# + * @returns {boolean} true iff the first acquirer flag for the message is set. + */ +_Message_['isFirstAcquirer'] = function() { + return (_pn_message_is_first_acquirer(this._message) > 0); +}; + +/** + * Set the first acquirer flag for a message. See {@link proton.Message.isFirstAcquirer} + * for details on the first acquirer flag. + * @method setFirstAcquirer + * @memberof! proton.Message# + * @param {boolean} first the new value of the first acquirer flag. + */ +_Message_['setFirstAcquirer'] = function(first) { + this._check(_pn_message_set_first_acquirer(this._message, first)); +}; + +/** + * Get the delivery count for a message. + * <p> + * The delivery count field tracks how many attempts have been made to + * deliver a message. Use {@link proton.Message.setDeliveryCount} to set + * the delivery count for a message. + * @method getDeliveryCount + * @memberof! proton.Message# + * @returns {number} the delivery count for the message. + */ +_Message_['getDeliveryCount'] = function() { + return _pn_message_get_delivery_count(this._message); +}; + +/** + * Set the delivery count for a message. See {@link proton.Message.getDeliveryCount} + * for details on what the delivery count means. + * @method setDeliveryCount + * @memberof! proton.Message# + * @param {number} count the new delivery count. + */ +_Message_['setDeliveryCount'] = function(count) { + this._check(_pn_message_set_delivery_count(this._message, count)); +}; + +/** + * Get the id for a message. + * <p> + * The message id provides a globally unique identifier for a message. + * A message id can be an a string, an unsigned long, a uuid or a binary value. + * @method getID + * @memberof! proton.Message# + * @returns {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} the message id. + */ +_Message_['getID'] = function() { + return this._id['getObject'](); +}; + +/** + * Set the id for a message. See {@link proton.Message.getID} + * for more details on the meaning of the message id. Note that only string, + * unsigned long, uuid, or binary values are permitted. + * @method setID + * @memberof! proton.Message# + * @param {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} id the + * new value of the message id. + */ +_Message_['setID'] = function(id) { + this._id['rewind'](); + if (Data.isNumber(id)) { + this._id['putULONG'](id); + } else { + this._id['putObject'](id); + } +}; + +/** + * Get the user id of the message creator. + * <p> + * The underlying raw data of the returned {@link proton.Data.Binary} will be + * valid until any one of the following operations occur: + * <pre> + * - {@link proton.Message.free} + * - {@link proton.Message.clear} + * - {@link proton.Message.setUserID} + * </pre> + * @method getUserID + * @memberof! proton.Message# + * @returns {proton.Data.Binary} the message's user id. + */ +_Message_['getUserID'] = function() { + var sp = Runtime.stackSave(); + // The implementation here is a bit "quirky" due to some low-level details + // of the interaction between emscripten and LLVM and the use of pn_bytes. + // The JavaScript code below is basically a binding to: + // + // pn_bytes_t bytes = pn_message_get_user_id(message); + + // Here's the quirky bit, pn_message_get_user_id actually returns pn_bytes_t + // *by value* but the low-level code handles this *by pointer* so we first + // need to allocate 8 bytes storage for {size, start} on the emscripten stack + // and then we pass the pointer to that storage as the first parameter to the + // compiled pn_message_get_user_id. + var bytes = allocate(8, 'i8', ALLOC_STACK); + _pn_message_get_user_id(bytes, this._message); + + // The bytes variable is really of type pn_bytes_t* so we use emscripten's + // getValue() call to retrieve the size and then the start pointer. + var size = getValue(bytes, 'i32'); + var start = getValue(bytes + 4, '*'); + + // Create a proton.Data.Binary from the pn_bytes_t information. + var binary = new Data['Binary'](size, start); + + // Tidy up the memory that we allocated on emscripten's stack. + Runtime.stackRestore(sp); + + return binary; +}; + +/** + * Set the user id for a message. This method takes a {@link proton.Data.Binary} + * consuming the underlying raw data in the process. For convenience this method + * also accepts a {@link proton.Data.Uuid}, number or string, converting them to a + * Binary internally. N.B. getUserID always returns a {@link proton.Data.Binary} + * even if a string or {@link proton.Data.Uuid} has been passed to setUserID. + * @method setUserID + * @memberof! proton.Message# + * @param {(string|proton.Data.Uuid)} id the new user id for the message. + */ +_Message_['setUserID'] = function(id) { + // If the id parameter is a proton.Data.Binary use it otherwise create a Binary + // using the string form of the parameter that was passed. + id = (id instanceof Data['Binary']) ? id : new Data['Binary']('' + id); + + var sp = Runtime.stackSave(); + // The implementation here is a bit "quirky" due to some low-level details + // of the interaction between emscripten and LLVM and the use of pn_bytes. + // The JavaScript code below is basically a binding to: + // + // pn_message_set_user_id(message, pn_bytes(id.size, id.start)); + + // Here's the quirky bit, pn_bytes actually returns pn_bytes_t *by value* but + // the low-level code handles this *by pointer* so we first need to allocate + // 8 bytes storage for {size, start} on the emscripten stack and then we + // pass the pointer to that storage as the first parameter to the pn_bytes. + var bytes = allocate(8, 'i8', ALLOC_STACK); + _pn_bytes(bytes, id.size, id.start); + + // The compiled pn_message_set_user_id takes the pn_bytes_t by reference not value. + this._check(_pn_message_set_user_id(this._message, bytes)); + + // After calling _pn_message_set_user_id the underlying Message object "owns" the + // binary data, so we can call free on the proton.Data.Binary instance to + // release any storage it has acquired back to the emscripten heap. + id['free'](); + Runtime.stackRestore(sp); +}; + +/** + * Get the address for a message. + * @method getAddress + * @memberof! proton.Message# + * @returns {string} the address of the Message. + */ +_Message_['getAddress'] = function() { + return Pointer_stringify(_pn_message_get_address(this._message)); +}; + +/** + * Set the address of the Message. + * @method setAddress + * @memberof! proton.Message# + * @param {string} address the address we want to send the Message to. + */ +_Message_['setAddress'] = function(address) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_address(this._message, allocate(intArrayFromString(address), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the subject for a message. + * @method getSubject + * @memberof! proton.Message# + * @returns {string} the subject of the Message. + */ +_Message_['getSubject'] = function() { + return Pointer_stringify(_pn_message_get_subject(this._message)); +}; + +/** + * Set the subject of the Message. + * @method setSubject + * @memberof! proton.Message# + * @param {string} subject the subject we want to set for the Message. + */ +_Message_['setSubject'] = function(subject) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_subject(this._message, allocate(intArrayFromString(subject), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the reply to for a message. + * @method getReplyTo + * @memberof! proton.Message# + * @returns {string} the reply to of the Message. + */ +_Message_['getReplyTo'] = function() { + return Pointer_stringify(_pn_message_get_reply_to(this._message)); +}; + +/** + * Set the reply to for a message. + * @method setReplyTo + * @memberof! proton.Message# + * @param {string} reply the reply to we want to set for the Message. + */ +_Message_['setReplyTo'] = function(reply) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_reply_to(this._message, allocate(intArrayFromString(reply), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the correlation id for a message. + * <p> + * A correlation id can be an a string, an unsigned long, a uuid or a binary value. + * @method getCorrelationID + * @memberof! proton.Message# + * @returns {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} the message id. + */ +_Message_['getCorrelationID'] = function() { + return this._correlationId['getObject'](); +}; + +/** + * Set the correlation id for a message. See {@link proton.Message.getCorrelationID} + * for more details on the meaning of the correlation id. Note that only string, + * unsigned long, uuid, or binary values are permitted. + * @method setCorrelationID + * @memberof! proton.Message# + * @param {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} id the + * new value of the correlation id. + */ +_Message_['setCorrelationID'] = function(id) { + this._correlationId['rewind'](); + if (Data.isNumber(id)) { + this._correlationId['putULONG'](id); + } else { + this._correlationId['putObject'](id); + } +}; + +/** + * Get the content type for a message. + * @method getContentType + * @memberof! proton.Message# + * @returns {string} the content type of the Message. + */ +_Message_['getContentType'] = function() { + return Pointer_stringify(_pn_message_get_content_type(this._message)); +}; + +/** + * Set the content type for a message. + * @method setContentType + * @memberof! proton.Message# + * @param {string} type the content type we want to set for the Message. + */ +_Message_['setContentType'] = function(type) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_content_type(this._message, allocate(intArrayFromString(type), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the content encoding for a message. + * @method getContentEncoding + * @memberof! proton.Message# + * @returns {string} the content encoding of the Message. + */ +_Message_['getContentEncoding'] = function() { + return Pointer_stringify(_pn_message_get_content_encoding(this._message)); +}; + +/** + * Set the content encoding for a message. + * @method setContentEncoding + * @memberof! proton.Message# + * @param {string} encoding the content encoding we want to set for the Message. + */ +_Message_['setContentEncoding'] = function(encoding) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_content_encoding(this._message, allocate(intArrayFromString(encoding), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the expiry time for a message. + * A zero value for the expiry time indicates that the message will + * never expire. This is the default value. + * @method getExpiryTime + * @memberof! proton.Message# + * @returns {Date} the expiry time for the message. + */ +_Message_['getExpiryTime'] = function() { + // Getting the timestamp is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to hold + // the 64 bit number and Data.Long.toNumber() to convert it back into a + // JavaScript number. + var low = _pn_message_get_expiry_time(this._message); + var high = Runtime.getTempRet0(); + var long = new Data.Long(low, high); + long = long.toNumber(); + return new Date(long); +}; + +/** + * Set the expiry time for a message. + * @method setExpiryTime + * @memberof! proton.Message# + * @param {(number|Date)} time the new expiry time for the message. + */ +_Message_['setExpiryTime'] = function(time) { + // Note that a timestamp is a 64 bit number so we have to use a proton.Data.Long. + var timestamp = Data.Long.fromNumber(time.valueOf()); + this._check(_pn_message_set_expiry_time(this._message, timestamp.getLowBitsUnsigned(), timestamp.getHighBits())); +}; + +/** + * Get the creation time for a message. + * A zero value for the creation time indicates that the creation time + * has not been set. This is the default value. + * @method getCreationTime + * @memberof! proton.Message# + * @returns {Date} the creation time for the message. + */ +_Message_['getCreationTime'] = function() { + // Getting the timestamp is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to hold + // the 64 bit number and Data.Long.toNumber() to convert it back into a + // JavaScript number. + var low = _pn_message_get_creation_time(this._message); + var high = Runtime.getTempRet0(); + var long = new Data.Long(low, high); + long = long.toNumber(); + return new Date(long); +}; + +/** + * Set the creation time for a message. + * @method setCreationTime + * @memberof! proton.Message# + * @param {(number|Date)} time the new creation time for the message. + */ +_Message_['setCreationTime'] = function(time) { + // Note that a timestamp is a 64 bit number so we have to use a proton.Data.Long. + var timestamp = Data.Long.fromNumber(time.valueOf()); + this._check(_pn_message_set_creation_time(this._message, timestamp.getLowBitsUnsigned(), timestamp.getHighBits())); +}; + +/** + * Get the group id for a message. + * @method getGroupID + * @memberof! proton.Message# + * @returns {string} the group id of the Message. + */ +_Message_['getGroupID'] = function() { + return Pointer_stringify(_pn_message_get_group_id(this._message)); +}; + +/** + * Set the group id for a message. + * @method setGroupID + * @memberof! proton.Message# + * @param {string} id the group id we want to set for the Message. + */ +_Message_['setGroupID'] = function(id) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_group_id(this._message, allocate(intArrayFromString(id), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the group sequence for a message. + * <p> + * The group sequence of a message identifies the relative ordering of + * messages within a group. The default value for the group sequence + * of a message is zero. + * @method getGroupSequence + * @memberof! proton.Message# + * @returns {number} the group sequence for the message. + */ +_Message_['getGroupSequence'] = function() { + return _pn_message_get_group_sequence(this._message); +}; + +/** + * Set the group sequence for a message. See {@link proton.Message.getGroupSequence} + * for details on what the group sequence means. + * @method setGroupSequence + * @memberof! proton.Message# + * @param {number} n the new group sequence for the message. + */ +_Message_['setGroupSequence'] = function(n) { + this._check(_pn_message_set_group_sequence(this._message, n)); +}; + +/** + * Get the reply to group id for a message. + * @method getReplyToGroupID + * @memberof! proton.Message# + * @returns {string} the reply to group id of the Message. + */ +_Message_['getReplyToGroupID'] = function() { + return Pointer_stringify(_pn_message_get_reply_to_group_id(this._message)); +}; + +/** + * Set the reply to group id for a message. + * @method setReplyToGroupID + * @memberof! proton.Message# + * @param {string} id the reply to group id we want to set for the Message. + */ +_Message_['setReplyToGroupID'] = function(id) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_reply_to_group_id(this._message, allocate(intArrayFromString(id), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * The following methods are marked as deprecated and are not implemented. + * pn_message_get_format() + * pn_message_set_format() + * pn_message_load() + * pn_message_load_data() + * pn_message_load_text() + * pn_message_load_amqp() + * pn_message_load_json() + * pn_message_save() + * pn_message_save_data() + * pn_message_save_text() + * pn_message_save_amqp() + * pn_message_save_json() + * pn_message_data() + */ + +/** + * Return a Binary representation of the message encoded in AMQP format. N.B. the + * returned {@link proton.Data.Binary} "owns" the underlying raw data and is thus + * responsible for freeing it or passing it to a method that consumes a Binary + * such as {@link proton.Message.decode}. + * @method encode + * @memberof! proton.Message# + * @returns {proton.Data.Binary} a representation of the message encoded in AMQP format. + */ +_Message_['encode'] = function() { + this._preEncode(); + var size = 1024; + while (true) { + setValue(size, size, 'i32'); // Set pass by reference variable. + var bytes = _malloc(size); // Allocate storage from emscripten heap. + var err = _pn_message_encode(this._message, bytes, size); + var size = getValue(size, 'i32'); // Dereference the real size value; + + if (err === Module['Error']['OVERFLOW']) { + _free(bytes); + size *= 2; + } else if (err >= 0) { + return new Data['Binary'](size, bytes); + } else { + _free(bytes); + this._check(err); + return; + } + } +}; + +/** + * Decodes and loads the message content from supplied Binary AMQP data N.B. + * this method "consumes" data from a {@link proton.Data.Binary} in other words + * it takes responsibility for the underlying data and frees the raw data from + * the Binary. + * @method decode + * @memberof! proton.Message# + * @param {proton.Data.Binary} encoded the AMQP encoded binary message. + */ +_Message_['decode'] = function(encoded) { + var err = _pn_message_decode(this._message, encoded.start, encoded.size); + encoded['free'](); // Free the original Binary. + if (err >= 0) { + this._postDecode(); + } + this._check(err); +}; +
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/abd646b2/proton-c/bindings/javascript/messenger.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/messenger.js b/proton-c/bindings/javascript/messenger.js new file mode 100644 index 0000000..993670f --- /dev/null +++ b/proton-c/bindings/javascript/messenger.js @@ -0,0 +1,799 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/*****************************************************************************/ +/* */ +/* Messenger */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a proton.Messenger instance giving it an (optional) name. If name + * is supplied that will be used as the name of the Messenger, otherwise a UUID + * will be used. The Messenger is initialised to non-blocking mode as it makes + * little sense to have blocking behaviour in a JavaScript implementation. + * @classdesc This class is + * @constructor proton.Messenger + * @param {string} name the name of this Messenger instance. + */ +Module['Messenger'] = function(name) { // Messenger Constructor. + /** + * The emscripten idiom below is used in a number of places in the JavaScript + * bindings to map JavaScript Strings to C style strings. ALLOC_STACK will + * increase the stack and place the item there. When the stack is next restored + * (by calling Runtime.stackRestore()), that memory will be automatically + * freed. In C code compiled by emscripten saving and restoring of the stack + * is automatic, but if we want to us ALLOC_STACK from native JavaScript we + * need to explicitly save and restore the stack using Runtime.stackSave() + * and Runtime.stackRestore() or we will leak emscripten heap memory. + * See https://github.com/kripken/emscripten/wiki/Interacting-with-code + * The _pn_messenger constructor copies the char* passed to it. + */ + var sp = Runtime.stackSave(); + this._messenger = _pn_messenger(name ? allocate(intArrayFromString(name), 'i8', ALLOC_STACK) : 0); + Runtime.stackRestore(sp); + + /** + * Initiate Messenger non-blocking mode. For JavaScript we make this the + * default behaviour and don't export this method because JavaScript is + * fundamentally an asynchronous non-blocking execution environment. + */ + _pn_messenger_set_blocking(this._messenger, false); + + // Subscriptions that haven't yet completed, used for managing subscribe events. + this._pendingSubscriptions = []; + + // Used in the Event registration mechanism (in the 'on' and 'emit' methods). + this._callbacks = {}; + + // This call ensures that the emscripten network callback functions are initialised. + Module.EventDispatch.registerMessenger(this); + + + // TODO improve error handling mechanism. + /* + * The emscripten websocket error event could get triggered by any Messenger + * and it's hard to determine which one without knowing which file descriptors + * are associated with which instance. As a workaround we set the _checkErrors + * flag when we call put or subscribe and reset it when work succeeds. + */ + this._checkErrors = false; + + /** + * TODO update to handle multiple Messenger instances + * Handle the emscripten websocket error and use it to trigger a MessengerError + * Note that the emscripten websocket error passes an array containing the + * file descriptor, the errno and the message, we just use the message here. + */ + var that = this; + Module['websocket']['on']('error', function(error) { + +console.log("Module['websocket']['on'] caller is " + arguments.callee.caller.toString()); + +console.log("that._checkErrors = " + that._checkErrors); +console.log("error = " + error); + if (that._checkErrors) { + that._emit('error', new Module['MessengerError'](error[2])); + } + }); +}; + +Module['Messenger'].PN_CUMULATIVE = 0x1; // Protected Class attribute. + +// Expose prototype as a variable to make method declarations less verbose. +var _Messenger_ = Module['Messenger'].prototype; + +// ************************* Protected methods ******************************** + +// We use the dot notation rather than associative array form for protected +// methods so they are visible to this "package", but the Closure compiler will +// minify and obfuscate names, effectively making a defacto "protected" method. + +/** + * This helper method checks the supplied error code, converts it into an + * exception and throws the exception. This method will try to use the message + * populated in pn_messenger_error(), if present, but if not it will fall + * back to using the basic error code rendering from pn_code(). + * @param code the error code to check. + */ +_Messenger_._check = function(code) { + if (code < 0) { + if (code === Module['Error']['INPROGRESS']) { + return code; + } + + var errno = this['getErrno'](); + var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); + + if (this._callbacks['error']) { + this._emit('error', new Module['MessengerError'](message)); + } else { + throw new Module['MessengerError'](message); + } + } else { + return code; + } +}; + +/** + * Invokes the callbacks registered for a specified event. + * @method _emit + * @memberof! proton.Messenger# + * @param event {string} the event we want to emit. + * @param param {object} the parameter we'd like to pass to the event callback. + */ +_Messenger_._emit = function(event, param) { + var callbacks = this._callbacks[event]; + if (callbacks) { + for (var i = 0; i < callbacks.length; i++) { + var callback = callbacks[i]; + if ('function' === typeof callback) { + callback.call(this, param); + } + } + } +}; + +/** + * Checks any pending subscriptions and when a source address becomes available + * emit a subscription event passing the Subscription that triggered the event. + * Note that this doesn't seem to work for listen/bind style subscriptions, + * that is to say subscriptions of the form amqp://~0.0.0.0 don't know why? + */ +_Messenger_._checkSubscriptions = function() { + // Check for completed subscriptions, and emit subscribe event. + var subscriptions = this._pendingSubscriptions; + if (subscriptions.length) { + var pending = []; // Array of any subscriptions that remain pending. + for (var j = 0; j < subscriptions.length; j++) { + subscription = subscriptions[j]; + if (subscription['getAddress']()) { + this._emit('subscription', subscription); + } else { + pending.push(subscription); + } + } + this._pendingSubscriptions = pending; + } +}; + + +// *************************** Public methods ***************************** + +/** + * N.B. The following methods are not exported by the JavaScript Messenger + * binding for reasons described below. + * + * For these methods it is expected that security would be implemented via + * a secure WebSocket. TODO what happens if we decide to implement TCP sockets + * via Node.js net library. If we do that we may want to compile OpenSSL + * using emscripten and include these methods. + * pn_messenger_set_certificate() + * pn_messenger_get_certificate() + * pn_messenger_set_private_key() + * pn_messenger_get_private_key() + * pn_messenger_set_password() + * pn_messenger_get_password() + * pn_messenger_set_trusted_certificates() + * pn_messenger_get_trusted_certificates() + * + * For these methods the implementation is fairly meaningless because JavaScript + * is a fundamentally asynchronous non-blocking environment. + * pn_messenger_set_timeout() + * pn_messenger_set_blocking() + * pn_messenger_interrupt() + * pn_messenger_send() // Not sure if this is useful in JavaScript. + */ + +/** + * Registers a listener callback for a specified event. + * @method on + * @memberof! proton.Messenger# + * @param {string} event the event we want to listen for. + * @param {function} callback the callback function to be registered for the specified event. + */ +_Messenger_['on'] = function(event, callback) { + if ('function' === typeof callback) { + if (!this._callbacks[event]) { + this._callbacks[event] = []; + } + + this._callbacks[event].push(callback); + } +}; + +/** + * Removes a listener callback for a specified event. + * @method removeListener + * @memberof! proton.Messenger# + * @param {string} event the event we want to detach from. + * @param {function} callback the callback function to be removed for the specified event. + * if no callback is specified all callbacks are removed for the event. + */ +_Messenger_['removeListener'] = function(event, callback) { + if (callback) { + var callbacks = this._callbacks[event]; + if ('function' === typeof callback && callbacks) { + // Search for the specified callback. + for (var i = 0; i < callbacks.length; i++) { + if (callback === callbacks[i]) { + // If we find the specified callback delete it and return. + callbacks.splice(i, 1); + return; + } + } + } + } else { + // If we call remove with no callback we remove all callbacks. + delete this._callbacks[event]; + } +}; + +/** + * Retrieves the name of a Messenger. + * @method getName + * @memberof! proton.Messenger# + * @returns {string} the name of the messenger. + */ +_Messenger_['getName'] = function() { + return Pointer_stringify(_pn_messenger_name(this._messenger)); +}; + +/** + * Retrieves the timeout for a Messenger. + * @method getTimeout + * @memberof! proton.Messenger# + * @returns {number} zero because JavaScript is fundamentally non-blocking. + */ +_Messenger_['getTimeout'] = function() { + return 0; +}; + +/** + * Accessor for messenger blocking mode. + * @method isBlocking + * @memberof! proton.Messenger# + * @returns {boolean} false because JavaScript is fundamentally non-blocking. + */ +_Messenger_['isBlocking'] = function() { + return false; +}; + +/** + * Free the Messenger. This will close all connections that are managed + * by the Messenger. Call the stop method before destroying the Messenger. + * <p> + * N.B. This method has to be called explicitly in JavaScript as we can't + * intercept finalisers, so we need to remember to free before removing refs. + * @method free + * @memberof! proton.Messenger# + */ +_Messenger_['free'] = function() { + // This call ensures that the emscripten network callback functions are removed. + Module.EventDispatch.unregisterMessenger(this); + _pn_messenger_free(this._messenger); +}; + +/** + * @method getErrno + * @memberof! proton.Messenger# + * @returns {number} the most recent error message code. + */ +_Messenger_['getErrno'] = function() { + return _pn_messenger_errno(this._messenger); +}; + +/** + * @method getError + * @memberof! proton.Messenger# + * @returns {string} the most recent error message as a String. + */ +_Messenger_['getError'] = function() { + return Pointer_stringify(_pn_error_text(_pn_messenger_error(this._messenger))); +}; + +/** + * Returns the size of the outgoing window that was set with setOutgoingWindow. + * The default is 0. + * @method getOutgoingWindow + * @memberof! proton.Messenger# + * @returns {number} the outgoing window size. + */ +_Messenger_['getOutgoingWindow'] = function() { + return _pn_messenger_get_outgoing_window(this._messenger); +}; + +/** + * Sets the outgoing tracking window for the Messenger. The Messenger will + * track the remote status of this many outgoing deliveries after calling + * send. Defaults to zero. + * <p> + * A Message enters this window when you call put() with the Message. + * If your outgoing window size is n, and you call put() n+1 times, status + * information will no longer be available for the first Message. + * @method setOutgoingWindow + * @memberof! proton.Messenger# + * @param {number} window the size of the tracking window in messages. + */ +_Messenger_['setOutgoingWindow'] = function(window) { + this._check(_pn_messenger_set_outgoing_window(this._messenger, window)); +}; + +/** + * Returns the size of the incoming window that was set with setIncomingWindow. + * The default is 0. + * @method getIncomingWindow + * @memberof! proton.Messenger# + * @returns {number} the incoming window size. + */ +_Messenger_['getIncomingWindow'] = function() { + return _pn_messenger_get_incoming_window(this._messenger); +}; + +/** + * Sets the incoming tracking window for the Messenger. The Messenger will + * track the remote status of this many incoming deliveries after calling + * send. Defaults to zero. + * <p> + * Messages enter this window only when you take them into your application + * using get(). If your incoming window size is n, and you get() n+1 messages + * without explicitly accepting or rejecting the oldest message, then the + * Message that passes beyond the edge of the incoming window will be assigned + * the default disposition of its link. + * @method setIncomingWindow + * @memberof! proton.Messenger# + * @param {number} window the size of the tracking window in messages. + */ +_Messenger_['setIncomingWindow'] = function(window) { + this._check(_pn_messenger_set_incoming_window(this._messenger, window)); +}; + +/** + * Currently a no-op placeholder. For future compatibility, do not send or + * recv messages before starting the Messenger. + * @method start + * @memberof! proton.Messenger# + */ +_Messenger_['start'] = function() { + this._check(_pn_messenger_start(this._messenger)); +}; + +/** + * Transitions the Messenger to an inactive state. An inactive Messenger + * will not send or receive messages from its internal queues. A Messenger + * should be stopped before being discarded to ensure a clean shutdown + * handshake occurs on any internally managed connections. + * <p> + * The Messenger may require some time to stop if it is busy, and in that + * case will return {@link proton.Error.INPROGRESS}. In that case, call isStopped + * to see if it has fully stopped. + * @method stop + * @memberof! proton.Messenger# + * @returns {@link proton.Error.INPROGRESS} if still busy. + */ +_Messenger_['stop'] = function() { + return this._check(_pn_messenger_stop(this._messenger)); +}; + +/** + * Returns true iff a Messenger is in the stopped state. + * @method isStopped + * @memberof! proton.Messenger# + * @returns {boolean} true iff a Messenger is in the stopped state. + */ +_Messenger_['isStopped'] = function() { + return (_pn_messenger_stopped(this._messenger) > 0); +}; + +/** + * Subscribes the Messenger to messages originating from the + * specified source. The source is an address as specified in the + * Messenger introduction with the following addition. If the + * domain portion of the address begins with the '~' character, the + * Messenger will interpret the domain as host/port, bind to it, + * and listen for incoming messages. For example "~0.0.0.0", + * "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any + * local interface and listen for incoming messages with the last + * variant only permitting incoming SSL connections. + * @method subscribe + * @memberof! proton.Messenger# + * @param {string} source the source address we're subscribing to. + * @returns {Subscription} a subscription. + */ +_Messenger_['subscribe'] = function(source) { + if (!source) { + this._check(Module['Error']['ARG_ERR']); + } + var sp = Runtime.stackSave(); + this._checkErrors = true; // TODO improve error handling mechanism. + var subscription = _pn_messenger_subscribe(this._messenger, + allocate(intArrayFromString(source), 'i8', ALLOC_STACK)); + Runtime.stackRestore(sp); + + if (!subscription) { + this._check(Module['Error']['ERR']); + } + + subscription = new Subscription(subscription) + this._pendingSubscriptions.push(subscription); + return subscription; +}; + +/** + * Places the content contained in the message onto the outgoing queue + * of the Messenger. This method will never block, however it will send any + * unblocked Messages in the outgoing queue immediately and leave any blocked + * Messages remaining in the outgoing queue. The outgoing property may be + * used to check the depth of the outgoing queue. + * <p> + * When the content in a given Message object is copied to the outgoing + * message queue, you may then modify or discard the Message object + * without having any impact on the content in the outgoing queue. + * <p> + * This method returns an outgoing tracker for the Message. The tracker + * can be used to determine the delivery status of the Message. + * @method put + * @memberof! proton.Messenger# + * @param {proton.Message} message a Message to send. + * @param {boolean} flush if this is set true or is undefined then messages are + * flushed (this is the default). If explicitly set to false then messages + * may not be sent immediately and might require an explicit call to work(). + * This may be used to "batch up" messages and *may* be more efficient. + * @returns {proton.Data.Long} a tracker. + */ +_Messenger_['put'] = function(message, flush) { + flush = flush === false ? false : true; + message._preEncode(); + this._checkErrors = true; // TODO improve error handling mechanism. + this._check(_pn_messenger_put(this._messenger, message._message)); + + // If flush is set invoke pn_messenger_work. + if (flush) { + _pn_messenger_work(this._messenger, 0); + } + + // Getting the tracker is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the + // low/high pair around to methods that require a tracker. + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + return new Data.Long(low, high); +}; + +/** + * Gets the last known remote state of the delivery associated with the given tracker. + * @method status + * @memberof! proton.Messenger# + * @param {proton.Data.Long} tracker the tracker whose status is to be retrieved. + * @returns {proton.Status} one of None, PENDING, REJECTED, or ACCEPTED. + */ +_Messenger_['status'] = function(tracker) { + if (tracker == null) { + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + } + + return _pn_messenger_status(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits()); +}; + +/** + * Checks if the delivery associated with the given tracker is still waiting to be sent. + * @method isBuffered + * @memberof! proton.Messenger# + * @param {proton.Data.Long} tracker the tracker identifying the delivery. + * @returns {boolean} true if delivery is still buffered. + */ +_Messenger_['isBuffered'] = function(tracker) { + if (tracker == null) { + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + } + + return (_pn_messenger_buffered(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits()) > 0); +}; + +/** + * Frees a Messenger from tracking the status associated with a given tracker. + * If you don't supply a tracker, all outgoing messages up to the most recent + * will be settled. + * @method settle + * @memberof! proton.Messenger# + * @param {proton.Data.Long} tracker the tracker identifying the delivery. + */ +_Messenger_['settle'] = function(tracker) { + // Getting the tracker is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the + // low/high pair around to methods that require a tracker. + var flags = 0; + if (tracker == null) { + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + flags = Module['Messenger'].PN_CUMULATIVE; + } + + this._check(_pn_messenger_settle(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits(), flags)); +}; + +/** + * Sends or receives any outstanding messages queued for a Messenger. + * For JavaScript the only timeout that makes sense is 0 (do not block). + * This method may also do I/O work other than sending and receiving messages. + * For example, closing connections after messenger.stop() has been called. + * @method work + * @memberof! proton.Messenger# + * @returns {boolean} true if there is work still to do, false otherwise. + */ +_Messenger_['work'] = function() { + var err = _pn_messenger_work(this._messenger, 0); + if (err === Module['Error']['TIMEOUT']) { +console.log("work = false"); + return false; + } else { + this._checkErrors = false; // TODO improve error handling mechanism. + this._check(err); +console.log("work = true"); + return true; + } +}; + +/** + * Receives up to limit messages into the incoming queue. If no value for limit + * is supplied, this call will receive as many messages as it can buffer internally. + * @method recv + * @memberof! proton.Messenger# + * @param {number} limit the maximum number of messages to receive or -1 to to receive + * as many messages as it can buffer internally. + */ +_Messenger_['recv'] = function(limit) { + this._check(_pn_messenger_recv(this._messenger, (limit ? limit : -1))); +}; + +/** + * Returns the capacity of the incoming message queue of messenger. Note this + * count does not include those messages already available on the incoming queue. + * @method receiving + * @memberof! proton.Messenger# + * @returns {number} the message queue capacity. + */ +_Messenger_['receiving'] = function() { + return _pn_messenger_receiving(this._messenger); +}; + +/** + * Moves the message from the head of the incoming message queue into the + * supplied message object. Any content in the message will be overwritten. + * <p> + * A tracker for the incoming Message is returned. The tracker can later be + * used to communicate your acceptance or rejection of the Message. + * @method get + * @memberof! proton.Messenger# + * @param {proton.Message} message the destination message object. If no Message + * object is supplied, the Message popped from the head of the queue is discarded. + * @param {boolean} decodeBinaryAsString if set decode any AMQP Binary payload + * objects as strings. This can be useful as the data in Binary objects + * will be overwritten with subsequent calls to get, so they must be + * explicitly copied. Needless to say it is only safe to set this flag if + * you know that the data you are dealing with is actually a string, for + * example C/C++ applications often seem to encode strings as AMQP binary, + * a common cause of interoperability problems. + * @returns {proton.Data.Long} a tracker for the incoming Message. + */ +_Messenger_['get'] = function(message, decodeBinaryAsString) { + var impl = null; + if (message) { + impl = message._message; + } + + this._check(_pn_messenger_get(this._messenger, impl)); + + if (message) { + message._postDecode(decodeBinaryAsString); + } + + // Getting the tracker is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the + // low/high pair around to methods that require a tracker. + var low = _pn_messenger_incoming_tracker(this._messenger); + var high = Runtime.getTempRet0(); + + return new Data.Long(low, high); +}; + +/** + * Returns the Subscription of the Message returned by the most recent call + * to get, or null if pn_messenger_get has not yet been called. + * @method incomingSubscription + * @memberof! proton.Messenger# + * @returns {Subscription} a Subscription or null if get has never been called + * for this Messenger. + */ +_Messenger_['incomingSubscription'] = function() { + var subscription = _pn_messenger_incoming_subscription(this._messenger); + if (subscription) { + return new Subscription(subscription); + } else { + return null; + } +}; + +/** + * Signal the sender that you have acted on the Message pointed to by the tracker. + * If no tracker is supplied, then all messages that have been returned by the + * get method are accepted, except those that have already been auto-settled + * by passing beyond your incoming window size. + * @method accept + * @memberof! proton.Messenger# + * @param {proton.Data.Long} tracker the tracker identifying the delivery. + */ +_Messenger_['accept'] = function(tracker) { + // Getting the tracker is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the + // low/high pair around to methods that require a tracker. + var flags = 0; + if (tracker == null) { + var low = _pn_messenger_incoming_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + flags = Module['Messenger'].PN_CUMULATIVE; + } + + this._check(_pn_messenger_accept(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits(), flags)); +}; + +/** + * Rejects the Message indicated by the tracker. If no tracker is supplied, + * all messages that have been returned by the get method are rejected, except + * those already auto-settled by passing beyond your outgoing window size. + * @method reject + * @memberof! proton.Messenger# + * @param {proton.Data.Long} tracker the tracker identifying the delivery. + */ +_Messenger_['reject'] = function(tracker) { + // Getting the tracker is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the + // low/high pair around to methods that require a tracker. + var flags = 0; + if (tracker == null) { + var low = _pn_messenger_incoming_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + flags = Module['Messenger'].PN_CUMULATIVE; + } + + this._check(_pn_messenger_reject(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits(), flags)); +}; + +/** + * Returns the number of messages in the outgoing message queue of a messenger. + * @method outgoing + * @memberof! proton.Messenger# + * @returns {number} the outgoing queue depth. + */ +_Messenger_['outgoing'] = function() { + return _pn_messenger_outgoing(this._messenger); +}; + +/** + * Returns the number of messages in the incoming message queue of a messenger. + * @method incoming + * @memberof! proton.Messenger# + * @returns {number} the incoming queue depth. + */ +_Messenger_['incoming'] = function() { + return _pn_messenger_incoming(this._messenger); +}; + +/** + * Adds a routing rule to a Messenger's internal routing table. + * <p> + * The route method may be used to influence how a messenger will internally treat + * a given address or class of addresses. Every call to the route method will + * result in messenger appending a routing rule to its internal routing table. + * <p> + * Whenever a message is presented to a messenger for delivery, it will match the + * address of this message against the set of routing rules in order. The first + * rule to match will be triggered, and instead of routing based on the address + * presented in the message, the messenger will route based on the address supplied + * in the rule. + * <p> + * The pattern matching syntax supports two types of matches, a '' will match any + * character except a '/', and a '*' will match any character including a '/'. + * <p> + * A routing address is specified as a normal AMQP address, however it may + * additionally use substitution variables from the pattern match that triggered + * the rule. + * <p> + * Any message sent to "foo" will be routed to "amqp://foo.com": + * <pre> + * route("foo", "amqp://foo.com"); + * </pre> + * Any message sent to "foobar" will be routed to "amqp://foo.com/bar": + * <pre> + * route("foobar", "amqp://foo.com/bar"); + * </pre> + * Any message sent to bar/<path> will be routed to the corresponding path within + * the amqp://bar.com domain: + * <pre> + * route("bar/*", "amqp://bar.com/$1"); + * </pre> + * Supply credentials for foo.com: + * <pre> + * route("amqp://foo.com/*", "amqp://user:[email protected]/$1"); + * </pre> + * Supply credentials for all domains: + * <pre> + * route("amqp://*", "amqp://user:password@$1"); + * </pre> + * Route all addresses through a single proxy while preserving the original destination: + * <pre> + * route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); + * </pre> + * Route any address through a single broker: + * <pre> + * route("*", "amqp://user:password@broker/$1"); + * </pre> + * @method route + * @memberof! proton.Messenger# + * @param {string} pattern a glob pattern to select messages. + * @param {string} address an address indicating outgoing address rewrite. + */ +_Messenger_['route'] = function(pattern, address) { + var sp = Runtime.stackSave(); + this._check(_pn_messenger_route(this._messenger, + allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK), + allocate(intArrayFromString(address), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Rewrite message addresses prior to transmission. + * <p> + * Similar to route(), except that the destination of the Message is determined + * before the message address is rewritten. + * <p> + * The outgoing address is only rewritten after routing has been finalized. If + * a message has an outgoing address of "amqp://0.0.0.0:5678", and a rewriting + * rule that changes its outgoing address to "foo", it will still arrive at the + * peer that is listening on "amqp://0.0.0.0:5678", but when it arrives there, + * the receiver will see its outgoing address as "foo". + * <p> + * The default rewrite rule removes username and password from addresses + * before they are transmitted. + * @method rewrite + * @memberof! proton.Messenger# + * @param {string} pattern a glob pattern to select messages. + * @param {string} address an address indicating outgoing address rewrite. + */ +_Messenger_['rewrite'] = function(pattern, address) { + var sp = Runtime.stackSave(); + this._check(_pn_messenger_rewrite(this._messenger, + allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK), + allocate(intArrayFromString(address), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/abd646b2/proton-c/bindings/javascript/module.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/module.js b/proton-c/bindings/javascript/module.js new file mode 100644 index 0000000..0fdb803 --- /dev/null +++ b/proton-c/bindings/javascript/module.js @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides a JavaScript wrapper around the Proton Messenger API. + * It will be used to wrap the emscripten compiled proton-c code and be minified by + * the Closure compiler, so all comments will be stripped from the actual library. + * <p> + * This JavaScript wrapper provides a somewhat more idiomatic object oriented + * interface which abstracts the low-level emscripten based implementation details + * from client code. Any similarities to the Proton Python binding are deliberate. + * @file + */ + +/** + * The Module Object is exported by emscripten for all execution platforms, we + * use it as a namespace to allow us to selectively export only what we wish to + * be publicly visible from this package/module. + * <p> + * Internally the binding code uses the associative array form for declaring + * exported properties to prevent the Closure compiler from minifying e.g. + * <pre>Module['Messenger'] = ...</pre> + * Exported Objects can be used in client code using a more convenient namespace, e.g.: + * <pre> + * proton = require('qpid-proton'); + * var messenger = new proton.Messenger(); + * var message = new proton.Message(); + * </pre> + * @namespace proton + */ + +var Module = { + // Prevent emscripten runtime exiting, we will be enabling network callbacks. + 'noExitRuntime' : true, +}; + + +/*****************************************************************************/ +/* */ +/* EventDispatch */ +/* */ +/*****************************************************************************/ + +/** + * EventDispatch is a Singleton class that allows callbacks to be registered which + * will get triggered by the emscripten WebSocket network callbacks. Clients of + * Messenger will register callbacks by calling: + * <pre> + * messenger.on('work', <callback function>); + * </pre> + * EventDispatch supports callback registration from multiple Messenger instances. + * The client callbacks will actually be called when a given messenger has work + * available or a WebSocket close has been occurred (in which case all registered + * callbacks will be called). + * <p> + * The approach implemented here allows the registered callbacks to follow a + * similar pattern to _process_incoming and _process_outgoing in async.py + * @memberof proton + */ +Module.EventDispatch = new function() { // Note the use of new to create a Singleton. + var _firstCall = true; // Flag used to check the first time registerMessenger is called. + var _messengers = {}; + + /** + * Provides functionality roughly equivalent to the following C code: + * while (1) { + * pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. + * process(); + * } + * The blocking call isn't viable in JavaScript as it is entirely asynchronous + * and we wouldn't want to replace the while(1) with a timed loop either!! + * This method gets triggered asynchronously by the emscripten socket events and + * we then perform an equivalent loop for each messenger, triggering every + * registered callback whilst there is work remaining. If triggered by close + * we bypass the _pn_messenger_work test as it will never succeed after closing. + */ + var _pump = function(fd, closing) { + for (var i in _messengers) { + if (_messengers.hasOwnProperty(i)) { + var messenger = _messengers[i]; + + if (closing) { + messenger._emit('work'); + } else { + while (_pn_messenger_work(messenger._messenger, 0) >= 0) { + messenger._checkSubscriptions(); + messenger._checkErrors = false; // TODO improve error handling mechanism. + messenger._emit('work'); + } + } + } + } + }; + + /** + * Listener for the emscripten socket close event. Delegates to _pump() + * passing a flag to indicate that the socket is closing. + */ + var _close = function(fd) { + _pump(fd, true); + }; + + /** + * Register the specified Messenger as being interested in network events. + */ + this.registerMessenger = function(messenger) { + if (_firstCall) { + /** + * Initialises the emscripten network callback functions. This needs + * to be done the first time we call registerMessenger rather than + * when we create the Singleton because emscripten's socket filesystem + * has to be mounted before can listen for any of these events. + */ + Module['websocket']['on']('open', _pump); + Module['websocket']['on']('connection', _pump); + Module['websocket']['on']('message', _pump); + Module['websocket']['on']('close', _close); + _firstCall = false; + } + + var name = messenger.getName(); + _messengers[name] = messenger; + }; + + /** + * Unregister the specified Messenger from interest in network events. + */ + this.unregisterMessenger = function(messenger) { + var name = messenger.getName(); + delete _messengers[name]; + }; +}; + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/abd646b2/proton-c/bindings/javascript/subscription.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/subscription.js b/proton-c/bindings/javascript/subscription.js new file mode 100644 index 0000000..83ede80 --- /dev/null +++ b/proton-c/bindings/javascript/subscription.js @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/*****************************************************************************/ +/* */ +/* Subscription */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a Subscription instance. + * @classdesc This class is a wrapper for Messenger's subscriptions. + * Subscriptions should never be *directly* instantiated by client code only via + * Messenger.subscribe() or Messenger.incomingSubscription(), so we declare the + * constructor in the scope of the package and don't export it via Module. + * @constructor Subscription + */ +var Subscription = function(subscription) { // Subscription Constructor. + this._subscription = subscription; +}; + +/** + * TODO Not sure exactly what pn_subscription_get_context does. + * @method getContext + * @memberof! Subscription# + * @returns the Subscription's Context. + */ +Subscription.prototype['getContext'] = function() { + return _pn_subscription_get_context(this._subscription); +}; + +/** + * TODO Not sure exactly what pn_subscription_set_context does. + * @method setContext + * @memberof! Subscription# + * @param context the Subscription's new Context. + */ +Subscription.prototype['setContext'] = function(context) { + _pn_subscription_set_context(this._subscription, context); +}; + +/** + * @method getAddress + * @memberof! Subscription# + * @returns the Subscription's Address. + */ +Subscription.prototype['getAddress'] = function() { + return Pointer_stringify(_pn_subscription_address(this._subscription)); +}; + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
