http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/server.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/server.js b/examples/messenger/javascript/server.js new file mode 100644 index 0000000..cce8aa3 --- /dev/null +++ b/examples/messenger/javascript/server.js @@ -0,0 +1,79 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Simple server for use with client.js illustrating request/response + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var address = "amqp://~0.0.0.0"; +var message = new proton.Message(); +var reply = new proton.Message(); +var messenger = new proton.Messenger(); + +var dispatch = function(request, response) { + var subject = request.getSubject(); + if (subject) { + response.setSubject('Re: ' + subject); + } + response.properties = request.properties + console.log("Dispatched " + subject + " " + JSON.stringify(request.properties)); +}; + +var pumpData = function() { + while (messenger.incoming()) { + var t = messenger.get(message); + + var replyTo = message.getReplyTo(); + if (replyTo) { + console.log(replyTo); + reply.setAddress(replyTo); + reply.setCorrelationID(message.getCorrelationID()); + reply.body = message.body; + dispatch(message, reply); + messenger.put(reply); + } + + messenger.accept(t); + } +}; + +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: node server.js <addr> (default " + address + ")"); + process.exit(0); + } + + address = args[0]; +} + +messenger.setIncomingWindow(1024); + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.start(); + +messenger.subscribe(address); +messenger.recv(); // Receive as many messages as messenger can buffer. +
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/spout.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/spout.js b/examples/messenger/javascript/spout.js index cfb9351..1219627 100644 --- a/examples/messenger/javascript/spout.js +++ b/examples/messenger/javascript/spout.js @@ -1,3 +1,4 @@ +#!/usr/bin/env node /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,102 +24,95 @@ if (typeof exports !== "undefined" && exports !== null) { proton = require("qpid-proton"); } -try { - var address = "amqp://0.0.0.0"; - var subject = "UK.WEATHER"; - var msgtext = "Hello World!"; - var tracker = null; - var running = true; +console.log("spout not implemented yet"); +process.exit(0); - var message = new proton.Message(); - var messenger = new proton.Messenger(); +var address = "amqp://0.0.0.0"; +var subject = "UK.WEATHER"; +var msgtext = "Hello World!"; +var tracker = null; +var running = true; - function _process() { -// console.log(" *** process ***"); +var message = new proton.Message(); +var messenger = new proton.Messenger(); - // Process outgoing messages - var status = messenger.status(tracker); - if (status != proton.Status.PENDING) { +function pumpData() { + var status = messenger.status(tracker); + if (status != proton.Status.PENDING) { console.log("status = " + status); - //messenger.settle(tracker); - //tracked--; - - if (running) { + if (running) { console.log("stopping"); - messenger.stop(); - running = false; - } - } + messenger.stop(); + running = false; + } + } - if (messenger.isStopped()) { + if (messenger.isStopped()) { console.log("exiting"); - message.free(); - messenger.free(); - //exit(0); - } - }; - - messenger.setOutgoingWindow(1024); + message.free(); + messenger.free(); + } +}; - messenger.setNetworkCallback(_process); - messenger.start(); +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.setOutgoingWindow(1024); +messenger.start(); - message.setAddress(address); - message.setSubject(subject); +message.setAddress(address); +message.setSubject(subject); - //message.body = msgtext; - //message.body = new proton.Data.Uuid(); - //message.body = new proton.Data.Symbol("My Symbol"); - //message.body = new proton.Data.Binary("Monkey BathпогÑÐ¾Ð¼Ð·Ñ Ñвбнм"); - //message.body = new proton.Data.Described("persian", "feline mammals"); +//message.body = msgtext; +//message.body = new proton.Data.Uuid(); +//message.body = new proton.Data.Symbol("My Symbol"); +message.body = new proton.Data.Binary("Monkey BathпогÑÐ¾Ð¼Ð·Ñ Ñвбнм"); +//message.body = new proton.Data.Described("persian", "feline mammals"); - //message.body = new Date(); +//message.body = new Date(); - //message.body = new proton.Data.Array('INT', [1, 3, 5, 7], "odd numbers"); +//message.body = new proton.Data.Array('INT', [1, 3, 5, 7], "odd numbers"); - //message.body = new proton.Data.Array('UINT', [1, 3, 5, 7], "odd"); - //message.body = new proton.Data.Array('ULONG', [1, 3, 5, 7], "odd"); - //message.body = new proton.Data.Array('FLOAT', [1, 3, 5, 7], "odd"); - //message.body = new proton.Data.Array('STRING', ["1", "3", "5", "7"], "odd"); +//message.body = new proton.Data.Array('UINT', [1, 3, 5, 7], "odd"); +//message.body = new proton.Data.Array('ULONG', [1, 3, 5, 7], "odd"); +//message.body = new proton.Data.Array('FLOAT', [1, 3, 5, 7], "odd"); +//message.body = new proton.Data.Array('STRING', ["1", "3", "5", "7"], "odd"); - //message.body = new Uint8Array([1, 3, 5, 7]); +//message.body = new Uint8Array([1, 3, 5, 7]); - //message.body = new proton.Data.Array('UINT', new Uint8Array([1, 3, 5, 7]), "odd"); +//message.body = new proton.Data.Array('UINT', new Uint8Array([1, 3, 5, 7]), "odd"); - //message.body = new proton.Data.Array('UUID', [new proton.Data.Uuid(), new proton.Data.Uuid(), new proton.Data.Uuid(), new proton.Data.Uuid()], "unique"); +//message.body = new proton.Data.Array('UUID', [new proton.Data.Uuid(), new proton.Data.Uuid(), new proton.Data.Uuid(), new proton.Data.Uuid()], "unique"); - /*message.body = new proton.Data.Binary(4); - var buffer = message.body.getBuffer(); - buffer[0] = 65; - buffer[1] = 77; - buffer[2] = 81; - buffer[3] = 80;*/ - message.body = new proton.Data.Binary([65, 77, 81, 80]); +/*message.body = new proton.Data.Binary(4); +var buffer = message.body.getBuffer(); +buffer[0] = 65; +buffer[1] = 77; +buffer[2] = 81; +buffer[3] = 80;*/ +//message.body = new proton.Data.Binary([65, 77, 81, 80]); +//message.body = new proton.Data.Binary(2485); +//message.body = new proton.Data.Binary(100000); - //message.body = null; - //message.body = true; - //message.body = 66..char(); - //message.body = " \"127.0\" "; +//message.body = null; +//message.body = true; +//message.body = 66..char(); +//message.body = " \"127.0\" "; - //message.body = 2147483647; // int - //message.body = -2147483649; // long - //message.body = 12147483649; // long - //message.body = (12147483649).long(); // long - //message.body = (-12147483649).ulong(); // long - //message.body = (17223372036854778000).ulong(); // ulong +//message.body = 2147483647; // int +//message.body = -2147483649; // long +//message.body = 12147483649; // long +//message.body = (12147483649).long(); // long +//message.body = (-12147483649).ulong(); // long +//message.body = (17223372036854778000).ulong(); // ulong - //message.body = (121474.836490).float(); // float TODO check me - //message.body = 12147483649.0.float(); // float TODO check me - //message.body = (4294967296).uint(); - //message.body = (255).ubyte(); +//message.body = (121474.836490).float(); // float TODO check me +//message.body = 12147483649.0.float(); // float TODO check me +//message.body = (4294967296).uint(); +//message.body = (255).ubyte(); - //message.body = ['Rod', 'Jane', 'Freddy']; - //message.body = ['Rod', 'Jane', 'Freddy', {cat: true, donkey: 'hee haw'}]; - //message.body = {cat: true, donkey: 'hee haw'}; +//message.body = ['Rod', 'Jane', 'Freddy']; +//message.body = ['Rod', 'Jane', 'Freddy', {cat: true, donkey: 'hee haw'}]; - tracker = messenger.put(message); +tracker = messenger.put(message); -} catch(e) { - console.log("Caught Exception " + e); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/ws2tcp.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/ws2tcp.js b/examples/messenger/javascript/ws2tcp.js new file mode 100755 index 0000000..abb78f2 --- /dev/null +++ b/examples/messenger/javascript/ws2tcp.js @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * ws2tcp.js is a simple node.js library that proxies from a WebSocket to a TCP + * Socket or vice versa. It has minimal dependencies - the standard node.js net + * library and the ws WebSocket library (npm install ws). + * <p> + * Two fuctions are exported, ws2tcp proxies from a WebSocket to a TCP Socket and + * tcp2ws proxies from a TCP Socket to a WebSocket. + * @Author Fraser Adams + * @file + */ + +var WebSocket = require('ws'); +var net = require('net'); + +/** + * This function is shared by ws2tcp and tcp2ws and takes care of cleaning up + * and closing the WebSocket and Socket when things close down or error. + * @param sock the TCP Socket instance we're registering cleanup handlers for. + * @param ws the WebSocket instance we're registering cleanup handlers for. + */ +var registerCleanupCallbacks = function(sock, ws) { + var cleanup = function(sock, ws) { + sock.removeAllListeners('close'); + sock.end(); + ws.removeAllListeners('close'); + ws.close(); + }; + + sock.on('close', function() { + cleanup(sock, ws); + }); + + sock.on('error', function (e) { + console.log("socket error: " + e.code); + cleanup(sock, ws); + }); + + ws.on('close', function() { + cleanup(sock, ws); + }); + + ws.on('error', function (e) { + console.log("websocket error: " + e.code); + cleanup(sock, ws); + }); +}; + +/** + * This function establishes a proxy that listens on a specified TCP Socket port + * and proxies data to a WebSocket on the target host listening on the specified + * target port. + * @param lport the listen port. + * @param thost the target host. + * @param tport the target port. + * @param subProtocols a string containing a comma separated list of WebSocket sub-protocols. + */ +var tcp2ws = function(lport, thost, tport, subProtocols) { + var opts = null; + if (subProtocols) { + // The regex trims the string (removes spaces at the beginning and end, + // then splits the string by <any space>,<any space> into an Array. + subProtocols = subProtocols.replace(/^ +| +$/g,"").split(/ *, */); + opts = {'protocol': subProtocols.toString()}; + } + + var server = net.createServer(function(sock) { + var url = 'ws://' + thost + ':' + tport; + var ws = new WebSocket(url, opts); + var ready = false; + var buffer = []; + + registerCleanupCallbacks(sock, ws); + + sock.on('data', function(data) { + if (ready) { + ws.send(data); + } else { + buffer.push(data); + } + }); + + ws.on('open', function () { + if (buffer.length > 0) { + ws.send(Buffer.concat(buffer)); + } + ready = true; + buffer = null; + }); + + ws.on('message', function(m) { + sock.write(m); + }); + }); + server.listen(lport); +}; + +/** + * This function establishes a proxy that listens on a specified WebSocket port + * and proxies data to a TCP Socket on the target host listening on the specified + * target port. + * @param lport the listen port. + * @param thost the target host. + * @param tport the target port. + */ +var ws2tcp = function(lport, thost, tport) { + var server = new WebSocket.Server({port: lport}); + server.on('connection', function(ws) { + var sock = net.connect(tport, thost); + var ready = false; + var buffer = []; + + registerCleanupCallbacks(sock, ws); + + ws.on('message', function(m) { + if (ready) { + sock.write(m); + } else { + buffer.push(m); + } + }); + + sock.on('connect', function() { + if (buffer.length > 0) { + sock.write(Buffer.concat(buffer)); + } + ready = true; + buffer = null; + }); + + sock.on('data', function(data) { + ws.send(data); + }); + }); + server.on('error', function(e) { + console.log("websocket server error: " + e.code); + }); +}; + +// Export the two proxy functions. +module.exports.ws2tcp = ws2tcp; +module.exports.tcp2ws = tcp2ws; + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/proton-c/bindings/javascript/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt index ef6d384..4e4dc0f 100644 --- a/proton-c/bindings/javascript/CMakeLists.txt +++ b/proton-c/bindings/javascript/CMakeLists.txt @@ -186,8 +186,6 @@ target_link_libraries(send-async.js qpid-proton-bitcode) add_executable(recv-async.js ${PN_PATH}/../examples/messenger/c/recv-async.c) target_link_libraries(recv-async.js qpid-proton-bitcode) -# TODO get the patches in my-library.js pushed properly into emscripten ASAP -# set_target_properties( send-async.js recv-async.js PROPERTIES @@ -196,16 +194,16 @@ set_target_properties( DEPENDS ws # This build shows socket messages - useful for debugging. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s SOCKET_DEBUG=1 --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js" + #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s SOCKET_DEBUG=1" # Optimised build - takes somewhat longer to build. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js" + #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1" # This build shows up emscripten warnings when building - should be able to remove it. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s VERBOSE=1 -O2 --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js" + #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s VERBOSE=1 -O2" # This build is optimised but not minified - LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js" + LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2" ) @@ -219,10 +217,16 @@ set_target_properties( PROPERTIES COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS}" - # This build is optimised and minified - #LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --pre-js + # This build is optimised and minified. The --memory-init-file 0 stops emscripten + # emitting a separate memory initialization file, if this was enabled it makes + # packaging harder as applications would expect proton.js.mem to be served too. + # It's even more fiddly with node.js packages. This behaviour might be reinstated + # if the packaging mechanism improves. + + # --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js + #LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js - LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_test', '_uuid_generate', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_r ecv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_free', '_pn_message_get_address', '_pn_message_errno', '_pn_message_error', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array' , '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_dec imal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\"" + LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_test', '_uuid_generate', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', ' _pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_messa ge_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_deci mal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\"" ) # This command packages up the compiled proton.js into a node.js package called http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/proton-c/bindings/javascript/TODO ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/TODO b/proton-c/bindings/javascript/TODO index e717410..bd1d158 100644 --- a/proton-c/bindings/javascript/TODO +++ b/proton-c/bindings/javascript/TODO @@ -1,31 +1,11 @@ Qpid Proton JavaScript Language Bindings TODO List ================================================== -This is still largely a Proof of Concept, but it builds cleanly from an unmodified proton-c -code base, so for the most part it feels like a really neat way to generate JavaScript bindings. - -The main TODO is to actually create JavaScript bindings :-) - -At the moment the send-async.js and recv-async.js are actually compiled to JavaScript from -send-async.c and recv-async.c so "real" JavaScript bindings usable from other JavaScript code -haven't been exported yet. I suspect that one of the most important things to check when that is -done would be interoperability across different types. This is all easy in the C code because -the emscripten runtime heap is backed by typed Arrays, but there might be shenanigans to be had -mapping stuff across the binding - it's definitely do-able just don't know how fiddly. - -Other TODO the code base contains some tweaked emscripten library code, this is ultimately going +The code base contains some tweaked emscripten library code, this is ultimately going to get done properly, commited back to emscripten and removed from here. -The example send-async and recv-async are both pretty hacky at the moment and *not really asynchronous*!! -This will be addressed ASAP, I hate the timed loop and although the code is non-blocking it's not -asynchronous. I'm pretty sure that I know how to get emscripten to call back when data is available -on a WebSocket so hopefully this should be straightforward. - -There's probably a slightly more contentious discussion to be had about a proper asynchronous interface -in proton-c proper. I suspect that it could do with one in the brave new multicore world pushing out -data down non-blocking queues to consumer threads tends to scale better than a model of threads locking -a resource, which is the behaviour I suspect the current API likely steers clients towards. +The example send-async.c and recv-async.c are both pretty hacky at the moment. proton-j seems to use hawt-dispatch, which is modelled after Grand Central Dispatch so I need to work out what it's using it do do and whether there are parallels in proton-c http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/proton-c/bindings/javascript/binding.c ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/binding.c b/proton-c/bindings/javascript/binding.c index e91c3c8..425efb4 100644 --- a/proton-c/bindings/javascript/binding.c +++ b/proton-c/bindings/javascript/binding.c @@ -1,80 +1,4 @@ #include <stdio.h> -#include <stdlib.h> +// Just a stub. -/* -#include "proton/message.h" - -typedef struct { - size_t next; - size_t prev; - size_t down; - size_t parent; - size_t children; - pn_atom_t atom; - // for arrays - bool described; - pn_type_t type; - bool data; - size_t data_offset; - size_t data_size; - char *start; - bool small; -} pni_node_t; - -pni_node_t* pn_data_add(pn_data_t *data); - -int test(pn_data_t *data, int64_t l) -{ -printf("hello\n"); - - pni_node_t *node = pn_data_add(data); - node->atom.type = PN_LONG; - node->atom.u.as_long = l; - - return 0; -} -*/ - - - - - -/* -z_streamp inflateInitialise() { - z_streamp stream = malloc(sizeof(z_stream)); - stream->zalloc = Z_NULL; - stream->zfree = Z_NULL; - int ret = inflateInit(stream); - if (ret != Z_OK) { - return Z_NULL; - } else { - return stream; - } -} - -void inflateDestroy(z_streamp stream) { - inflateEnd(stream); - free(stream); -} - -int zinflate(z_streamp stream, - unsigned char* dest, unsigned long* destLen, - unsigned char* source, unsigned long sourceLen) { - int err; - int total = stream->total_out; - stream->avail_in = sourceLen; - stream->next_in = source; - - stream->avail_out = *destLen; - stream->next_out = dest; - - err = inflate(stream, Z_SYNC_FLUSH); - *destLen = stream->total_out - total; - - if (err != Z_OK) { - inflateEnd(stream); - } - return err; -} -*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/proton-c/bindings/javascript/binding.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/binding.js b/proton-c/bindings/javascript/binding.js index 08aeb3e..eceda54 100644 --- a/proton-c/bindings/javascript/binding.js +++ b/proton-c/bindings/javascript/binding.js @@ -37,7 +37,7 @@ * from minifying e.g. <pre>Module['Messenger'] = ...</pre> * Exported Objects can be used in client code using the appropriate namespace: * <pre> - * proton = require('proton.js'); + * proton = require('qpid-proton'); * var messenger = new proton.Messenger(); * var message = new proton.Message(); * </pre> @@ -102,6 +102,81 @@ Module['Error'] = { /*****************************************************************************/ /* */ +/* MessengerError */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a proton.MessengerError instance. + * @classdesc This class is a subclass of Error. + * @constructor proton.MessengerError + * @param the error message. + */ +Module['MessengerError'] = function(message) { // MessengerError constructor. + this.name = "MessengerError"; + this.message = (message || ""); +}; + +Module['MessengerError'].prototype = new Error(); +Module['MessengerError'].prototype.constructor = Module['MessengerError']; + +Module['MessengerError'].prototype.toString = function() { + return this.name + ': ' + this.message +}; + + +/*****************************************************************************/ +/* */ +/* MessageError */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a proton.MessageError instance. + * @classdesc This class is a subclass of Error. + * @constructor proton.MessageError + * @param the error message. + */ +Module['MessageError'] = function(message) { // MessageError constructor. + this.name = "MessageError"; + this.message = (message || ""); +}; + +Module['MessageError'].prototype = new Error(); +Module['MessageError'].prototype.constructor = Module['MessageError']; + +Module['MessageError'].prototype.toString = function() { + return this.name + ': ' + this.message +}; + + +/*****************************************************************************/ +/* */ +/* DataError */ +/* */ +/*****************************************************************************/ + +/** + * Constructs a proton.DataError instance. + * @classdesc This class is a subclass of Error. + * @constructor proton.DataError + * @param the error message. + */ +Module['DataError'] = function(message) { // DataError constructor. + this.name = "DataError"; + this.message = (message || ""); +}; + +Module['DataError'].prototype = new Error(); +Module['DataError'].prototype.constructor = Module['DataError']; + +Module['DataError'].prototype.toString = function() { + return this.name + ': ' + this.message +}; + + +/*****************************************************************************/ +/* */ /* Messenger */ /* */ /*****************************************************************************/ @@ -138,11 +213,37 @@ Module['Messenger'] = function(name) { // Messenger Constructor. * fundamentally an asynchronous non-blocking execution environment. */ _pn_messenger_set_blocking(this._messenger, false); + + // Used in the Event registration mechanism (in the 'on' and 'emit' methods). + this._callbacks = {}; + + /* + * The emscripten websocket error event could get triggered by any Messenger + * and it's hard to determine which one without knowing which file descriptors + * are associated with which instance. As a workaround we set the _checkErrors + * flag when we call put or subscribe and reset it when work succeeds. + */ + this._checkErrors = false; + + /** + * TODO update to handle multiple Messenger instances + * Handle the emscripten websocket error and use it to trigger a MessengerError + * Note that the emscripten websocket error passes an array containing the + * file descriptor, the errno and the message, we just use the message here. + */ + var that = this; + Module['websocket']['on']('error', function(error) { +console.log("that._checkErrors = " + that._checkErrors); +console.log("error = " + error); + if (that._checkErrors) { + that.emit('error', new Module['MessengerError'](error[2])); + } + }); }; Module['Messenger'].PN_CUMULATIVE = 0x1; // Protected Class attribute. - // Expose prototype as a variable to make method declarations less verbose. +// Expose prototype as a variable to make method declarations less verbose. var _Messenger_ = Module['Messenger'].prototype; // ************************* Protected methods ******************************** @@ -167,17 +268,29 @@ _Messenger_._check = function(code) { var errno = this['getErrno'](); var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); - throw { // TODO Improve name and level. - name: 'Messenger Error', - level: 'Show Stopper', - message: message, - toString: function() {return this.name + ': ' + this.message} - }; + if (this._callbacks['error']) { + this.emit('error', new Module['MessengerError'](message)); + } else { + throw new Module['MessengerError'](message); + } } else { return code; } }; +/** + * Invokes a callback registered for a specified event. + * @method emit + * @memberof! proton.Messenger# + * @param event the event we want to emit. + * @param param the parameter we'd like to pass to the event callback. + */ +_Messenger_.emit = function(event, param) { + if ('function' === typeof this._callbacks[event]) { + this._callbacks[event].call(this, param); + } +}; + // *************************** Public methods ***************************** /** @@ -206,6 +319,40 @@ _Messenger_._check = function(code) { */ /** + * Registers a listener callback for a specified event. + * @method on + * @memberof! proton.Messenger# + * @param event the event we want to listen for. + * @param callback the callback function to be registered for the specified event. + */ +_Messenger_['on'] = function(event, callback) { + if ('function' === typeof callback) { + if (event === 'work') { + Module.EventDispatch.addListener(this, callback); + } else { + this._callbacks[event] = callback; + } + } +}; + +/** + * Removes a listener callback for a specified event. + * @method removeListener + * @memberof! proton.Messenger# + * @param event the event we want to detach from. + * @param callback the callback function to be remove for the specified event. + */ +_Messenger_['removeListener'] = function(event, callback) { + if ('function' === typeof callback) { + if (event === 'work') { + Module.EventDispatch.removeListener(this, callback); + } else { + this._callbacks[event] = null;//callback; + } + } +}; + +/** * Retrieves the name of a Messenger. * @method getName * @memberof! proton.Messenger# @@ -379,9 +526,11 @@ _Messenger_['subscribe'] = function(source) { this._check(Module['Error']['ARG_ERR']); } var sp = Runtime.stackSave(); + this._checkErrors = true; var subscription = _pn_messenger_subscribe(this._messenger, allocate(intArrayFromString(source), 'i8', ALLOC_STACK)); Runtime.stackRestore(sp); + if (!subscription) { this._check(Module['Error']['ERR']); } @@ -409,6 +558,7 @@ _Messenger_['subscribe'] = function(source) { */ _Messenger_['put'] = function(message) { message._preEncode(); + this._checkErrors = true; this._check(_pn_messenger_put(this._messenger, message._message)); // Getting the tracker is a little tricky as it is a 64 bit number. The way @@ -451,7 +601,6 @@ _Messenger_['isBuffered'] = function(tracker) { * @param {proton.Data.Long} tracker the tracker identifying the delivery. */ _Messenger_['settle'] = function(tracker) { -console.log("settle: not fully tested yet"); // Getting the tracker is a little tricky as it is a 64 bit number. The way // emscripten handles this is to return the low 32 bits directly and pass // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the @@ -469,15 +618,24 @@ console.log("settle: not fully tested yet"); /** * Sends or receives any outstanding messages queued for a Messenger. - * For JavaScript the only timeout that makes sense is 0 == do not block. + * For JavaScript the only timeout that makes sense is 0 (do not block). * This method may also do I/O work other than sending and receiving messages. * For example, closing connections after messenger.stop() has been called. * @method work * @memberof! proton.Messenger# - * @returns {number} 0 if no work to do, < 0 if error, or 1 if work was done. + * @returns {boolean} true if there is work still to do, false otherwise. */ _Messenger_['work'] = function() { - return _pn_messenger_work(this._messenger, 0); + var err = _pn_messenger_work(this._messenger, 0); + if (err === Module['Error']['TIMEOUT']) { +console.log("work = false"); + return false; + } else { + this._checkErrors = false; + this._check(err); +console.log("work = true"); + return true; + } }; /** @@ -513,9 +671,16 @@ _Messenger_['receiving'] = function() { * @memberof! proton.Messenger# * @param {proton.Message} message the destination message object. If no Message * object is supplied, the Message popped from the head of the queue is discarded. + * @param {boolean} decodeBinaryAsString if set decode any AMQP Binary payload + * objects as strings. This can be useful as the data in Binary objects + * will be overwritten with subsequent calls to get, so they must be + * explicitly copied. Needless to say it is only safe to set this flag if + * you know that the data you are dealing with is actually a string, for + * example C/C++ applications often seem to encode strings as AMQP binary, + * a common cause of interoperability problems. * @returns {proton.Data.Long} a tracker for the incoming Message. */ -_Messenger_['get'] = function(message) { +_Messenger_['get'] = function(message, decodeBinaryAsString) { var impl = null; if (message) { impl = message._message; @@ -524,7 +689,7 @@ _Messenger_['get'] = function(message) { this._check(_pn_messenger_get(this._messenger, impl)); if (message) { - message._postDecode(); + message._postDecode(decodeBinaryAsString); } // Getting the tracker is a little tricky as it is a 64 bit number. The way @@ -533,8 +698,6 @@ _Messenger_['get'] = function(message) { // low/high pair around to methods that require a tracker. var low = _pn_messenger_incoming_tracker(this._messenger); var high = Runtime.getTempRet0(); -console.log("get low = " + low); -console.log("get high = " + high); return new Data.Long(low, high); }; @@ -548,8 +711,6 @@ console.log("get high = " + high); * for this Messenger. */ _Messenger_['incomingSubscription'] = function() { -console.log("incomingSubscription: haven't yet proved this works yet"); - var subscription = _pn_messenger_incoming_subscription(this._messenger); if (subscription) { return new Subscription(subscription); @@ -568,7 +729,6 @@ console.log("incomingSubscription: haven't yet proved this works yet"); * @param {proton.Data.Long} tracker the tracker identifying the delivery. */ _Messenger_['accept'] = function(tracker) { -console.log("accept: not fully tested yet"); // Getting the tracker is a little tricky as it is a 64 bit number. The way // emscripten handles this is to return the low 32 bits directly and pass // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the @@ -593,7 +753,6 @@ console.log("accept: not fully tested yet"); * @param {proton.Data.Long} tracker the tracker identifying the delivery. */ _Messenger_['reject'] = function(tracker) { -console.log("reject: not fully tested yet"); // Getting the tracker is a little tricky as it is a 64 bit number. The way // emscripten handles this is to return the low 32 bits directly and pass // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the @@ -629,17 +788,61 @@ _Messenger_['incoming'] = function() { return _pn_messenger_incoming(this._messenger); }; - - /** - * + * Adds a routing rule to a Messenger's internal routing table. + * <p> + * The route method may be used to influence how a messenger will internally treat + * a given address or class of addresses. Every call to the route method will + * result in messenger appending a routing rule to its internal routing table. + * <p> + * Whenever a message is presented to a messenger for delivery, it will match the + * address of this message against the set of routing rules in order. The first + * rule to match will be triggered, and instead of routing based on the address + * presented in the message, the messenger will route based on the address supplied + * in the rule. + * <p> + * The pattern matching syntax supports two types of matches, a '' will match any + * character except a '/', and a '*' will match any character including a '/'. + * <p> + * A routing address is specified as a normal AMQP address, however it may + * additionally use substitution variables from the pattern match that triggered + * the rule. + * <p> + * Any message sent to "foo" will be routed to "amqp://foo.com": + * <pre> + * route("foo", "amqp://foo.com"); + * </pre> + * Any message sent to "foobar" will be routed to "amqp://foo.com/bar": + * <pre> + * route("foobar", "amqp://foo.com/bar"); + * </pre> + * Any message sent to bar/<path> will be routed to the corresponding path within + * the amqp://bar.com domain: + * <pre> + * route("bar/*", "amqp://bar.com/$1"); + * </pre> + * Supply credentials for foo.com: + * <pre> + * route("amqp://foo.com/*", "amqp://user:[email protected]/$1"); + * </pre> + * Supply credentials for all domains: + * <pre> + * route("amqp://*", "amqp://user:password@$1"); + * </pre> + * Route all addresses through a single proxy while preserving the original destination: + * <pre> + * route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); + * </pre> + * Route any address through a single broker: + * <pre> + * route("*", "amqp://user:password@broker/$1"); + * </pre> * @method route * @memberof! proton.Messenger# * @param {string} pattern a glob pattern to select messages. * @param {string} address an address indicating outgoing address rewrite. */ _Messenger_['route'] = function(pattern, address) { -console.log("route: not fully tested yet"); var sp = Runtime.stackSave(); this._check(_pn_messenger_route(this._messenger, allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK), @@ -648,6 +851,8 @@ console.log("route: not fully tested yet"); }; /** + * Rewrite message addresses prior to transmission. + * <p> * Similar to route(), except that the destination of the Message is determined * before the message address is rewritten. * <p> @@ -665,7 +870,6 @@ console.log("route: not fully tested yet"); * @param {string} address an address indicating outgoing address rewrite. */ _Messenger_['rewrite'] = function(pattern, address) { -console.log("rewrite: not fully tested yet"); var sp = Runtime.stackSave(); this._check(_pn_messenger_rewrite(this._messenger, allocate(intArrayFromString(pattern), 'i8', ALLOC_STACK), @@ -674,43 +878,133 @@ console.log("rewrite: not fully tested yet"); }; +/*****************************************************************************/ +/* */ +/* EventDispatch */ +/* */ +/*****************************************************************************/ +/** + * EventDispatch is a Singleton class that allows callbacks to be registered that + * will get triggered by the emscripten WebSocket network callbacks. Clients of + * Messenger will register callbacks by calling: + * <pre> + * messenger.on('work', <callback function>); + * </pre> + * EventDispatch supports callback registration from multiple Messenger instances + * and supports multiple callbacks being registered for each instance. The client + * callbacks will actually be called when a given messenger has work available + * or a WebSocket close has been occurred (in which case all registered callbacks + * will be called). + * <p> + * The approach implemented here allows the registered callbacks to follow a + * similar pattern to _process_incoming and _process_outgoing in async.py + * @memberof proton + */ +Module.EventDispatch = new function() { // Note the use of new to create a Singleton. + var _firstCall = true; // Flag used to check the first time addListener is called. + var _messengers = {}; -// TODO This needs tweaking to enable working with multiple Messenger instances. -_Messenger_['setNetworkCallback'] = function(callback) { -//console.log("setting network callback"); + /** + * Provides functionality roughly equivalent to the following C code: + * while (1) { + * pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. + * process(); + * } + * The blocking call isn't viable in JavaScript as it is entirely asynchronous + * and we wouldn't want to replace the while(1) with a timed loop either!! + * This method gets triggered asynchronously by the emscripten socket events and + * we then perform an equivalent loop for each messenger, triggering every + * registered callback whilst there is work remaining. If triggered by close + * we bypass the _pn_messenger_work test as it will never succeed after closing. + */ + var _pump = function(fd, closing) { + for (var i in _messengers) { + if (_messengers.hasOwnProperty(i)) { + var current = _messengers[i]; + + if (closing) { + current.invokeCallbacks(); + } else { + var messenger = current.messenger; + while (_pn_messenger_work(messenger._messenger, 0) >= 0) { + messenger._checkErrors = false; + current.invokeCallbacks(); + } + } + } + } + }; - // Expose messenger reference in the scope of Messenger.Messenger so that - // the _work function can correctly dereference it. - var messenger = this._messenger; + /** + * Listener for the emscripten socket close event. Delegates to _pump() + * passing a flag to indicate that the socket is closing. + */ + var _close = function(fd) { + _pump(fd, true); + }; - function _work() { - //console.log(" *** internal work ***"); + /** + * Initialises the emscripten network callback functions. This needs to be + * done the first time we call addListener rather that when we create the + * Singleton because emscripten's socket filesystem has to be mounted before + * we can register listeners for any of these events. + */ + var _init = function() { + Module['websocket']['on']('open', _pump); + Module['websocket']['on']('connection', _pump); + Module['websocket']['on']('message', _pump); + Module['websocket']['on']('close', _close); + }; - var err = _pn_messenger_work(messenger, 0); -//console.log("err = " + err); + /** + * Add a listener callback for the specified Messenger. Multiple listeners + * are permitted for each Messenger and listeners can be registered for + * multiple Messenger instances. The first time this method is called we + * initialise the emscripten network callback functions. + */ + this.addListener = function(messenger, callback) { + if (_firstCall) { + _init(); + _firstCall = false; + } - if (err >= 0) { - callback(); + var name = messenger.getName(); + if (!_messengers[name]) { + _messengers[name] = { + messenger: messenger, + callbacks: [], + invokeCallbacks: function() { + for (var j = 0; j < this.callbacks.length; j++) { + this.callbacks[j](); + } + } + }; } - err = _pn_messenger_work(messenger, 0); -//console.log("err = " + err); + _messengers[name].callbacks.push(callback); + }; - if (err >= 0) { - callback(); + /** + * Remove the specified listener callback from the specified Messenger. + */ + this.removeListener = function(messenger, callback) { + var name = messenger.getName(); + if (_messengers[name]) { + // If we find the registered Messenger search for the specified callback. + var callbacks = _messengers[name].callbacks; + for (var j = 0; j < callbacks.length; j++) { + if (callback === callbacks[j]) { + // If we find the specified callback delete it and return. + callbacks.splice(j, 1); + return; + } + } } }; - - // Set the emscripten network callback function. - Module['networkCallback'] = _work; }; - - - - /*****************************************************************************/ /* */ /* Subscription */ @@ -767,41 +1061,43 @@ Subscription.prototype['getAddress'] = function() { /** * Constructs a proton.Message instance. - * @classdesc This class is + * @classdesc This class is a mutable holder of message content that may be used + * to generate and encode or decode and access AMQP formatted message data. * @constructor proton.Message + * @property {object} instructions delivery instructions for the message. + * @property {object} annotations infrastructure defined message annotations. + * @property {object} properties application defined message properties. + * @property {object} body message body as a native JavaScript Object. + * @property {object} data message body as a proton.Data Object. */ Module['Message'] = function() { // Message Constructor. this._message = _pn_message(); + this._id = new Data(_pn_message_id(this._message)); + this._correlationId = new Data(_pn_message_correlation_id(this._message)); // ************************* Public properties **************************** - /** - * Delivery instructions for the Message. - * @type map - */ this['instructions'] = null; - - /** - * Infrastructure defined Message annotations. - * @type map - */ this['annotations'] = null; - /** - * Application defined Message properties. - * @type map - */ + // Intitialise with an empty Object so we can set properties in a natural way. + // message.properties.prop1 = "foo"; + // message.properties.prop2 = "bar"; this['properties'] = {}; - /** - * Message body. - * @type bytes | unicode | map | list | int | long | float | UUID - */ this['body'] = null; + this['data'] = null; }; +// Expose constructor as package scope variable to make internal calls less verbose. +var Message = Module['Message']; + // Expose prototype as a variable to make method declarations less verbose. -var _Message_ = Module['Message'].prototype; +var _Message_ = Message.prototype; + +// ************************** Class properties ******************************** + +Message['DEFAULT_PRIORITY'] = 4; /** Default priority for messages.*/ // ************************* Protected methods ******************************** @@ -821,12 +1117,7 @@ _Message_._check = function(code) { var errno = this['getErrno'](); var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); - throw { // TODO Improve name and level. - name: 'Message Error', - level: 'Show Stopper', - message: message, - toString: function() {return this.name + ': ' + this.message} - }; + throw new Module['MessageError'](message); } else { return code; } @@ -845,37 +1136,40 @@ _Message_._preEncode = function() { inst.clear(); if (this['instructions']) { -console.log("Encoding instructions"); inst['putObject'](this['instructions']); } ann.clear(); if (this['annotations']) { -console.log("Encoding annotations"); ann['putObject'](this['annotations']); } props.clear(); if (this['properties']) { -console.log("Encoding properties"); props['putObject'](this['properties']); } body.clear(); if (this['body']) { -console.log("Encoding body"); body['putObject'](this['body']); } }; /** * Decode the Message after receiving off the wire. - */ -_Message_._postDecode = function() { + * @param {boolean} decodeBinaryAsString if set decode any AMQP Binary payload + * objects as strings. This can be useful as the data in Binary objects + * will be overwritten with subsequent calls to get, so they must be + * explicitly copied. Needless to say it is only safe to set this flag if + * you know that the data you are dealing with is actually a string, for + * example C/C++ applications often seem to encode strings as AMQP binary, + * a common cause of interoperability problems. + */ +_Message_._postDecode = function(decodeBinaryAsString) { var inst = new Data(_pn_message_instructions(this._message)); var ann = new Data(_pn_message_annotations(this._message)); var props = new Data(_pn_message_properties(this._message)); - var body = new Data(_pn_message_body(this._message)); + var body = new Data(_pn_message_body(this._message), decodeBinaryAsString); if (inst.next()) { this['instructions'] = inst['getObject'](); @@ -896,8 +1190,10 @@ _Message_._postDecode = function() { } if (body.next()) { + this['data'] = body; this['body'] = body['getObject'](); } else { + this['data'] = null; this['body'] = null; } }; @@ -935,6 +1231,293 @@ _Message_['getError'] = function() { }; /** + * Clears the contents of the Message. All fields will be reset to their default values. + * @method clear + * @memberof! proton.Message# + */ +_Message_['clear'] = function() { + _pn_message_clear(this._message); + this['instructions'] = null; + this['annotations'] = null; + this['properties'] = {}; + this['body'] = null; + this['data'] = null; +}; + +/** + * Get the inferred flag for a message. + * <p> + * The inferred flag for a message indicates how the message content + * is encoded into AMQP sections. If inferred is true then binary and + * list values in the body of the message will be encoded as AMQP DATA + * and AMQP SEQUENCE sections, respectively. If inferred is false, + * then all values in the body of the message will be encoded as AMQP + * VALUE sections regardless of their type. Use + * {@link proton.Message.setInferred} to set the value. + * @method isInferred + * @memberof! proton.Message# + * @returns {boolean} true iff the inferred flag for the message is set. + */ +_Message_['isInferred'] = function() { + return (_pn_message_is_inferred(this._message) > 0); +}; + +/** + * Set the inferred flag for a message. See {@link proton.Message.isInferred} + * for a description of what the inferred flag is. + * @method setInferred + * @memberof! proton.Message# + * @param {boolean} inferred the new value of the inferred flag. + */ +_Message_['setInferred'] = function(inferred) { + this._check(_pn_message_set_inferred(this._message, inferred)); +}; + +/** + * Get the durable flag for a message. + * <p> + * The durable flag indicates that any parties taking responsibility + * for the message must durably store the content. Use + * {@link proton.Message.setDurable} to set the value. + * @method isDurable + * @memberof! proton.Message# + * @returns {boolean} true iff the durable flag for the message is set. + */ +_Message_['isDurable'] = function() { + return (_pn_message_is_durable(this._message) > 0); +}; + +/** + * Set the durable flag for a message. See {@link proton.Message.isDurable} + * for a description of what the durable flag is. + * @method setDurable + * @memberof! proton.Message# + * @param {boolean} durable the new value of the durable flag. + */ +_Message_['setDurable'] = function(durable) { + this._check(_pn_message_set_durable(this._message, durable)); +}; + +/** + * Get the priority for a message. + * <p> + * The priority of a message impacts ordering guarantees. Within a + * given ordered context, higher priority messages may jump ahead of + * lower priority messages. Priority range is 0..255 + * @method getPriority + * @memberof! proton.Message# + * @returns {number} the priority of the Message. + */ +_Message_['getPriority'] = function() { + return _pn_message_get_priority(this._message) & 0xFF; // & 0xFF converts to unsigned. +}; + +/** + * Set the priority of the Message. See {@link proton.Message.getPriority} + * for details on message priority. + * @method setPriority + * @memberof! proton.Message# + * @param {number} priority the address we want to send the Message to. + */ +_Message_['setPriority'] = function(priority) { + this._check(_pn_message_set_priority(this._message, priority)); +}; + +/** + * Get the ttl for a message. + * <p> + * The ttl for a message determines how long a message is considered + * live. When a message is held for retransmit, the ttl is + * decremented. Once the ttl reaches zero, the message is considered + * dead. Once a message is considered dead it may be dropped. Use + * {@link proton.Message.setTTL} to set the ttl for a message. + * @method getTTL + * @memberof! proton.Message# + * @returns {number} the ttl in milliseconds. + */ +_Message_['getTTL'] = function() { + return _pn_message_get_ttl(this._message); +}; + +/** + * Set the ttl for a message. See {@link proton.Message.getTTL} + * for a detailed description of message ttl. + * @method setTTL + * @memberof! proton.Message# + * @param {number} ttl the new value for the message ttl in milliseconds. + */ +_Message_['setTTL'] = function(ttl) { + this._check(_pn_message_set_ttl(this._message, ttl)); +}; + +/** + * Get the first acquirer flag for a message. + * <p> + * When set to true, the first acquirer flag for a message indicates + * that the recipient of the message is the first recipient to acquire + * the message, i.e. there have been no failed delivery attempts to + * other acquirers. Note that this does not mean the message has not + * been delivered to, but not acquired, by other recipients. + * @method isFirstAcquirer + * @memberof! proton.Message# + * @returns {boolean} true iff the first acquirer flag for the message is set. + */ +_Message_['isFirstAcquirer'] = function() { + return (_pn_message_is_first_acquirer(this._message) > 0); +}; + +/** + * Set the first acquirer flag for a message. See {@link proton.Message.isFirstAcquirer} + * for details on the first acquirer flag. + * @method setFirstAcquirer + * @memberof! proton.Message# + * @param {boolean} first the new value of the first acquirer flag. + */ +_Message_['setFirstAcquirer'] = function(first) { + this._check(_pn_message_set_first_acquirer(this._message, first)); +}; + +/** + * Get the delivery count for a message. + * <p> + * The delivery count field tracks how many attempts have been made to + * deliver a message. Use {@link proton.Message.setDeliveryCount} to set + * the delivery count for a message. + * @method getDeliveryCount + * @memberof! proton.Message# + * @returns {number} the delivery count for the message. + */ +_Message_['getDeliveryCount'] = function() { + return _pn_message_get_delivery_count(this._message); +}; + +/** + * Set the delivery count for a message. See {@link proton.Message.getDeliveryCount} + * for details on what the delivery count means. + * @method setDeliveryCount + * @memberof! proton.Message# + * @param {number} count the new delivery count. + */ +_Message_['setDeliveryCount'] = function(count) { + this._check(_pn_message_set_delivery_count(this._message, count)); +}; + +/** + * Get the id for a message. + * <p> + * The message id provides a globally unique identifier for a message. + * A message id can be an a string, an unsigned long, a uuid or a binary value. + * @method getID + * @memberof! proton.Message# + * @returns {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} the message id. + */ +_Message_['getID'] = function() { + return this._id['getObject'](); +}; + +/** + * Set the id for a message. See {@link proton.Message.getID} + * for more details on the meaning of the message id. Note that only string, + * unsigned long, uuid, or binary values are permitted. + * @method setID + * @memberof! proton.Message# + * @param {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} id the + * new value of the message id. + */ +_Message_['setID'] = function(id) { + this._id['rewind'](); + if (Data.isNumber(id)) { + this._id['putULONG'](id); + } else { + this._id['putObject'](id); + } +}; + +/** + * Get the user id of the message creator. + * <p> + * The underlying raw data of the returned {@link proton.Data.Binary} will be + * valid until any one of the following operations occur: + * <pre> + * - {@link proton.Message.free} + * - {@link proton.Message.clear} + * - {@link proton.Message.setUserID} + * </pre> + * @method getUserID + * @memberof! proton.Message# + * @returns {proton.Data.Binary} the message's user id. + */ +_Message_['getUserID'] = function() { + var sp = Runtime.stackSave(); + // The implementation here is a bit "quirky" due to some low-level details + // of the interaction between emscripten and LLVM and the use of pn_bytes. + // The JavaScript code below is basically a binding to: + // + // pn_bytes_t bytes = pn_message_get_user_id(message); + + // Here's the quirky bit, pn_message_get_user_id actually returns pn_bytes_t + // *by value* but the low-level code handles this *by pointer* so we first + // need to allocate 8 bytes storage for {size, start} on the emscripten stack + // and then we pass the pointer to that storage as the first parameter to the + // compiled pn_message_get_user_id. + var bytes = allocate(8, 'i8', ALLOC_STACK); + _pn_message_get_user_id(bytes, this._message); + + // The bytes variable is really of type pn_bytes_t* so we use emscripten's + // getValue() call to retrieve the size and then the start pointer. + var size = getValue(bytes, 'i32'); + var start = getValue(bytes + 4, '*'); + + // Create a proton.Data.Binary from the pn_bytes_t information. + var binary = new Data['Binary'](size, start); + + // Tidy up the memory that we allocated on emscripten's stack. + Runtime.stackRestore(sp); + + return binary; +}; + +/** + * Set the user id for a message. This method takes a {@link proton.Data.Binary} + * consuming the underlying raw data in the process. For convenience this method + * also accepts a {@link proton.Data.Uuid} or a string, converting them to a + * Binary internally. N.B. getUserID always returns a {@link proton.Data.Binary} + * even if a string or {@link proton.Data.Uuid} has been passed to setUserID. + * @method setUserID + * @memberof! proton.Message# + * @param {(string||proton.Data.Uuid)} id the new user id for the message. + */ +_Message_['setUserID'] = function(id) { + // If the id parameter is a proton.Data.Binary use it otherwise create a Binary + // using the string form of the parameter that was passed. + id = (id instanceof Data['Binary']) ? id : new Data['Binary']('' + id); + + var sp = Runtime.stackSave(); + // The implementation here is a bit "quirky" due to some low-level details + // of the interaction between emscripten and LLVM and the use of pn_bytes. + // The JavaScript code below is basically a binding to: + // + // pn_message_set_user_id(message, pn_bytes(id.size, id.start)); + + // Here's the quirky bit, pn_bytes actually returns pn_bytes_t *by value* but + // the low-level code handles this *by pointer* so we first need to allocate + // 8 bytes storage for {size, start} on the emscripten stack and then we + // pass the pointer to that storage as the first parameter to the pn_bytes. + var bytes = allocate(8, 'i8', ALLOC_STACK); + _pn_bytes(bytes, id.size, id.start); + + // The compiled pn_message_set_user_id takes the pn_bytes_t by reference not value. + this._check(_pn_message_set_user_id(this._message, bytes)); + + // After calling _pn_message_set_user_id the underlying Message object "owns" the + // binary data, so we can call free on the proton.Data.Binary instance to + // release any storage it has acquired back to the emscripten heap. + id['free'](); + Runtime.stackRestore(sp); +}; + +/** + * Get the address for a message. * @method getAddress * @memberof! proton.Message# * @returns {string} the address of the Message. @@ -956,6 +1539,7 @@ _Message_['setAddress'] = function(address) { }; /** + * Get the subject for a message. * @method getSubject * @memberof! proton.Message# * @returns {string} the subject of the Message. @@ -976,17 +1560,302 @@ _Message_['setSubject'] = function(subject) { Runtime.stackRestore(sp); }; +/** + * Get the reply to for a message. + * @method getReplyTo + * @memberof! proton.Message# + * @returns {string} the reply to of the Message. + */ +_Message_['getReplyTo'] = function() { + return Pointer_stringify(_pn_message_get_reply_to(this._message)); +}; +/** + * Set the reply to for a message. + * @method setReplyTo + * @memberof! proton.Message# + * @param {string} reply the reply to we want to set for the Message. + */ +_Message_['setReplyTo'] = function(reply) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_reply_to(this._message, allocate(intArrayFromString(reply), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; +/** + * Get the correlation id for a message. + * <p> + * A correlation id can be an a string, an unsigned long, a uuid or a binary value. + * @method getCorrelationID + * @memberof! proton.Message# + * @returns {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} the message id. + */ +_Message_['getCorrelationID'] = function() { + return this._correlationId['getObject'](); +}; +/** + * Set the correlation id for a message. See {@link proton.Message.getCorrelationID} + * for more details on the meaning of the correlation id. Note that only string, + * unsigned long, uuid, or binary values are permitted. + * @method setCorrelationID + * @memberof! proton.Message# + * @param {(number|string|proton.Data.Long|proton.Data.Uuid|proton.Data.Binary)} id the + * new value of the correlation id. + */ +_Message_['setCorrelationID'] = function(id) { + this._correlationId['rewind'](); + if (Data.isNumber(id)) { + this._correlationId['putULONG'](id); + } else { + this._correlationId['putObject'](id); + } +}; +/** + * Get the content type for a message. + * @method getContentType + * @memberof! proton.Message# + * @returns {string} the content type of the Message. + */ +_Message_['getContentType'] = function() { + return Pointer_stringify(_pn_message_get_content_type(this._message)); +}; +/** + * Set the content type for a message. + * @method setContentType + * @memberof! proton.Message# + * @param {string} type the content type we want to set for the Message. + */ +_Message_['setContentType'] = function(type) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_content_type(this._message, allocate(intArrayFromString(type), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; +/** + * Get the content encoding for a message. + * @method getContentEncoding + * @memberof! proton.Message# + * @returns {string} the content encoding of the Message. + */ +_Message_['getContentEncoding'] = function() { + return Pointer_stringify(_pn_message_get_content_encoding(this._message)); +}; +/** + * Set the content encoding for a message. + * @method setContentEncoding + * @memberof! proton.Message# + * @param {string} encoding the content encoding we want to set for the Message. + */ +_Message_['setContentEncoding'] = function(encoding) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_content_encoding(this._message, allocate(intArrayFromString(encoding), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the expiry time for a message. + * A zero value for the expiry time indicates that the message will + * never expire. This is the default value. + * @method getExpiryTime + * @memberof! proton.Message# + * @returns {Date} the expiry time for the message. + */ +_Message_['getExpiryTime'] = function() { + // Getting the timestamp is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to hold + // the 64 bit number and Data.Long.toNumber() to convert it back into a + // JavaScript number. + var low = _pn_message_get_expiry_time(this._message); + var high = Runtime.getTempRet0(); + var long = new Data.Long(low, high); + long = long.toNumber(); + return new Date(long); +}; + +/** + * Set the expiry time for a message. + * @method setExpiryTime + * @memberof! proton.Message# + * @param {(number|Date)} time the new expiry time for the message. + */ +_Message_['setExpiryTime'] = function(time) { + // Note that a timestamp is a 64 bit number so we have to use a proton.Data.Long. + var timestamp = Data.Long.fromNumber(time.valueOf()); + this._check(_pn_message_set_expiry_time(this._message, timestamp.getLowBitsUnsigned(), timestamp.getHighBits())); +}; +/** + * Get the creation time for a message. + * A zero value for the creation time indicates that the creation time + * has not been set. This is the default value. + * @method getCreationTime + * @memberof! proton.Message# + * @returns {Date} the creation time for the message. + */ +_Message_['getCreationTime'] = function() { + // Getting the timestamp is a little tricky as it is a 64 bit number. The way + // emscripten handles this is to return the low 32 bits directly and pass + // the high 32 bits via the tempRet0 variable. We use Data.Long to hold + // the 64 bit number and Data.Long.toNumber() to convert it back into a + // JavaScript number. + var low = _pn_message_get_creation_time(this._message); + var high = Runtime.getTempRet0(); + var long = new Data.Long(low, high); + long = long.toNumber(); + return new Date(long); +}; +/** + * Set the creation time for a message. + * @method setCreationTime + * @memberof! proton.Message# + * @param {(number|Date)} time the new creation time for the message. + */ +_Message_['setCreationTime'] = function(time) { + // Note that a timestamp is a 64 bit number so we have to use a proton.Data.Long. + var timestamp = Data.Long.fromNumber(time.valueOf()); + this._check(_pn_message_set_creation_time(this._message, timestamp.getLowBitsUnsigned(), timestamp.getHighBits())); +}; +/** + * Get the group id for a message. + * @method getGroupID + * @memberof! proton.Message# + * @returns {string} the group id of the Message. + */ +_Message_['getGroupID'] = function() { + return Pointer_stringify(_pn_message_get_group_id(this._message)); +}; +/** + * Set the group id for a message. + * @method setGroupID + * @memberof! proton.Message# + * @param {string} id the group id we want to set for the Message. + */ +_Message_['setGroupID'] = function(id) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_group_id(this._message, allocate(intArrayFromString(id), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * Get the group sequence for a message. + * <p> + * The group sequence of a message identifies the relative ordering of + * messages within a group. The default value for the group sequence + * of a message is zero. + * @method getGroupSequence + * @memberof! proton.Message# + * @returns {number} the group sequence for the message. + */ +_Message_['getGroupSequence'] = function() { + return _pn_message_get_group_sequence(this._message); +}; + +/** + * Set the group sequence for a message. See {@link proton.Message.getGroupSequence} + * for details on what the group sequence means. + * @method setGroupSequence + * @memberof! proton.Message# + * @param {number} n the new group sequence for the message. + */ +_Message_['setGroupSequence'] = function(n) { + this._check(_pn_message_set_group_sequence(this._message, n)); +}; + +/** + * Get the reply to group id for a message. + * @method getReplyToGroupID + * @memberof! proton.Message# + * @returns {string} the reply to group id of the Message. + */ +_Message_['getReplyToGroupID'] = function() { + return Pointer_stringify(_pn_message_get_reply_to_group_id(this._message)); +}; + +/** + * Set the reply to group id for a message. + * @method setReplyToGroupID + * @memberof! proton.Message# + * @param {string} id the reply to group id we want to set for the Message. + */ +_Message_['setReplyToGroupID'] = function(id) { + var sp = Runtime.stackSave(); + this._check(_pn_message_set_reply_to_group_id(this._message, allocate(intArrayFromString(id), 'i8', ALLOC_STACK))); + Runtime.stackRestore(sp); +}; + +/** + * The following methods are marked as deprecated and are not implemented. + * pn_message_get_format() + * pn_message_set_format() + * pn_message_load() + * pn_message_load_data() + * pn_message_load_text() + * pn_message_load_amqp() + * pn_message_load_json() + * pn_message_save() + * pn_message_save_data() + * pn_message_save_text() + * pn_message_save_amqp() + * pn_message_save_json() + * pn_message_data() + */ + +/** + * Return a Binary representation of the message encoded in AMQP format. N.B. the + * returned {@link proton.Data.Binary} "owns" the underlying raw data and is thus + * responsible for freeing it or passing it to a method that consumes a Binary + * such as {@link proton.Message.decode}. + * @method encode + * @memberof! proton.Data# + * @returns {proton.Data.Binary} a representation of the message encoded in AMQP format. + */ +_Message_['encode'] = function() { + this._preEncode(); + var size = 1024; + while (true) { + setValue(size, size, 'i32'); // Set pass by reference variable. + var bytes = _malloc(size); // Allocate storage from emscripten heap. + var err = _pn_message_encode(this._message, bytes, size); + var size = getValue(size, 'i32'); // Dereference the real size value; + + if (err === Module['Error']['OVERFLOW']) { + _free(bytes); + size *= 2; + } else if (err >= 0) { + return new Data['Binary'](size, bytes); + } else { + _free(bytes); + this._check(err); + return; + } + } +}; + +/** + * Decodes and loads the message content from supplied Binary AMQP data N.B. + * this method "consumes" data from a {@link proton.Data.Binary} in other words + * it takes responsibility for the underlying data and frees the raw data from + * the Binary. + * @method decode + * @memberof! proton.Data# + * @param {proton.Data.Binary} encoded the AMQP encoded binary message. + */ +_Message_['decode'] = function(encoded) { + var err = _pn_message_decode(this._message, encoded.start, encoded.size); + encoded['free'](); // Free the original Binary. + if (err >= 0) { + this._postDecode(); + } + this._check(err); +}; /*****************************************************************************/ @@ -1025,8 +1894,15 @@ _Message_['setSubject'] = function(subject) { * necessary. If no data is supplied then the Data is stand-alone and the * client application is responsible for freeing the underlying data via * a call to free(). - */ -Module['Data'] = function(data) { // Data Constructor. + * @param {boolean} decodeBinaryAsString if set decode any AMQP Binary payload + * objects as strings. This can be useful as the data in Binary objects + * will be overwritten with subsequent calls to get, so they must be + * explicitly copied. Needless to say it is only safe to set this flag if + * you know that the data you are dealing with is actually a string, for + * example C/C++ applications often seem to encode strings as AMQP binary, + * a common cause of interoperability problems. + */ +Module['Data'] = function(data, decodeBinaryAsString) { // Data Constructor. if (!data) { this._data = _pn_data(16); // Default capacity is 16 this['free'] = function() { @@ -1038,6 +1914,7 @@ Module['Data'] = function(data) { // Data Constructor. this._data = data; this['free'] = function() {}; } + this._decodeBinaryAsString = decodeBinaryAsString; }; // Expose constructor as package scope variable to make internal calls less verbose. @@ -1821,12 +2698,7 @@ _Data_._check = function(code) { var errno = this['getErrno'](); var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); - throw { // TODO Improve name and level. - name: 'Data Error', - level: 'Show Stopper', - message: message, - toString: function() {return this.name + ': ' + this.message} - }; + throw new Module['DataError'](message); } else { return code; } @@ -1966,7 +2838,7 @@ _Data_['type'] = function() { * returned {@link proton.Data.Binary} "owns" the underlying raw data and is thus * responsible for freeing it or passing it to a method that consumes a Binary * such as {@link proton.Data.decode} or {@link proton.Data.putBINARY}. - * @method type + * @method encode * @memberof! proton.Data# * @returns {proton.Data.Binary} a representation of the data encoded in AMQP format. */ @@ -2221,7 +3093,7 @@ _Data_['putLONG'] = function(l) { * Puts a timestamp. * @method putTIMESTAMP * @memberof! proton.Data# - * @param {Date} d a Date value. + * @param {(number|Date)} d a Date value. */ _Data_['putTIMESTAMP'] = function(d) { // Note that a timestamp is a 64 bit number so we have to use a proton.Data.Long. @@ -2596,7 +3468,7 @@ _Data_['getULONG'] = function() { var high = Runtime.getTempRet0(); var long = new Data.Long(low, high); long = long.toNumber(); - return (long > 0) ? long : Data.Long.TWO_PWR_64_DBL_ + long; + return (long >= 0) ? long : Data.Long.TWO_PWR_64_DBL_ + long; }; /** @@ -2744,7 +3616,12 @@ _Data_['getBINARY'] = function() { // Tidy up the memory that we allocated on emscripten's stack. Runtime.stackRestore(sp); - return binary; + // If _decodeBinaryAsString is set return the stringified form of the Binary. + if (this._decodeBinaryAsString) { + return binary.toString(); + } else { + return binary; + } }; /** @@ -3101,11 +3978,7 @@ _Data_['getARRAY'] = function() { * @param {object} obj the JavaScript Object or primitive to be serialised. */ _Data_['putObject'] = function(obj) { -console.log("Data.putObject"); - -console.log("obj = " + obj); -//console.log("typeof obj = " + (typeof obj)); -//console.log("obj prototype = " + Object.prototype.toString.call(obj)); +//console.log("Data.putObject " + obj); if (obj == null) { // == Checks for null and undefined. this['putNULL'](); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/proton-c/bindings/javascript/my-library.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/my-library.js b/proton-c/bindings/javascript/my-library.js index b35ccd1..af89ef4 100644 --- a/proton-c/bindings/javascript/my-library.js +++ b/proton-c/bindings/javascript/my-library.js @@ -27,11 +27,52 @@ mergeInto(LibraryManager.library, { // Hacks below // ----------------------------------------------------------------------------------------------------------------- - $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', $SOCKFS__deps: ['$FS'], $SOCKFS: { mount: function(mount) { + // If Module['websocket'] has already been defined (e.g. for configuring + // subprotocol/url) use that, if not initialise it to a new object. + Module['websocket'] = (Module['websocket'] && + ('object' === typeof Module['websocket'])) ? Module['websocket'] : {}; + + // Add Event registration mechanism to the exported websocket configuration + // object so we can register network callbacks from native JavaScript too. + Module['websocket']._callbacks = {}; + Module['websocket'].on = function(event, callback) { + if ('function' === typeof callback) { + this._callbacks[event] = callback; + } + return this; + }; + + Module['websocket'].emit = function(event, param) { + if ('function' === typeof this._callbacks[event]) { + this._callbacks[event].call(this, param); + } + }; + + // Register default null callbacks for each Event + Module['websocket'].on("error", function(error) { +console.log("Websocket error " + error); + }); + + Module['websocket'].on("open", function(fd) { +console.log("Websocket open fd = " + fd); + }); + + Module['websocket'].on("connection", function(fd) { +console.log("Websocket connection fd = " + fd); + }); + + Module['websocket'].on("message", function(fd) { +console.log("Websocket message fd = " + fd); + }); + + Module['websocket'].on("close", function(fd) { +console.log("Websocket close fd = " + fd); + }); + return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); }, createSocket: function(family, type, protocol) { @@ -279,7 +320,7 @@ console.log("handleOpen triggering networkCallback"); Module['networkCallback'](); } - + Module['websocket'].emit('open', 10); }; @@ -317,6 +358,8 @@ console.log("handleMessage triggering networkCallback"); Module['networkCallback'](); } + Module['websocket'].emit('message', 10); + }; @@ -328,15 +371,24 @@ console.log("handleMessage triggering networkCallback"); } handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer }); + peer.socket.on('close', function() { + Module['websocket'].emit('close', 10); + }); peer.socket.on('error', function(error) { -console.log('error ' + error); + Module['websocket'].emit('error', error); // don't throw }); } else { peer.socket.onopen = handleOpen; + peer.socket.onclose = function() { + Module['websocket'].emit('close', 10); + }; peer.socket.onmessage = function peer_socket_onmessage(event) { handleMessage(event.data); }; + peer.socket.onerror = function(error) { + Module['websocket'].emit('error', error); + }; } }, @@ -504,26 +556,24 @@ console.log('close'); SOCKFS.websocket_sock_ops.createPeer(sock, ws); } - - - - - if (Module['networkCallback']) { console.log("On connection triggering networkCallback"); Module['networkCallback'](); } + Module['websocket'].emit('connection', 10); }); sock.server.on('closed', function() { console.log('sock.server closed'); + Module['websocket'].emit('close', 10); sock.server = null; }); - sock.server.on('error', function() { + sock.server.on('error', function(error) { console.log('sock.server error'); + Module['websocket'].emit('error', error); // don't throw }); }, http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/tests/javascript/codec.js ---------------------------------------------------------------------- diff --git a/tests/javascript/codec.js b/tests/javascript/codec.js index 95e36e4..36156ae 100644 --- a/tests/javascript/codec.js +++ b/tests/javascript/codec.js @@ -30,7 +30,7 @@ if (typeof exports !== "undefined" && exports !== null) { proton = require("qpid-proton"); } -// We extend TestCase by creating an instance and adding test methods as properties. +// Extend TestCase by creating a prototype instance and adding test methods as properties. var DataTest = new unittest.TestCase(); DataTest.setUp = function() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
