The qpid-config port is largely complete except for xml binding support, it also needs a bit of tidying up. Added a simple soak test to send and receive messages from a broker ad-infinitum and a simple html send message example
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1621865 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a78327f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a78327f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a78327f Branch: refs/heads/master Commit: 4a78327f5a6f2369ef15ede360f4bc86d288f7c4 Parents: c3efc08 Author: fadams <fadams@unknown> Authored: Mon Sep 1 18:32:04 2014 +0000 Committer: fadams <fadams@unknown> Committed: Mon Sep 1 18:32:04 2014 +0000 ---------------------------------------------------------------------- examples/messenger/javascript/qpid-config.js | 227 +++++-- examples/messenger/javascript/send.html | 110 ++++ examples/messenger/javascript/send.js | 6 +- proton-c/bindings/javascript/CMakeLists.txt | 43 +- proton-c/bindings/javascript/binding.js | 39 +- proton-c/bindings/javascript/my-library.js | 755 ---------------------- tests/javascript/soak.js | 99 +++ 7 files changed, 448 insertions(+), 831 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/qpid-config.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/qpid-config.js b/examples/messenger/javascript/qpid-config.js index 466f8b6..eb0951f 100755 --- a/examples/messenger/javascript/qpid-config.js +++ b/examples/messenger/javascript/qpid-config.js @@ -41,7 +41,11 @@ if (typeof exports !== "undefined" && exports !== null) { proton = require("qpid-proton"); } -var address = 'amqp://0.0.0.0:5673/qmf.default.direct'; +var addr = 'guest:guest@localhost:5673'; +//var addr = 'localhost:5673'; +var address = 'amqp://' + addr + '/qmf.default.direct'; +console.log(address); + var replyTo = ''; var subscription; var subscribed = false; @@ -52,13 +56,14 @@ var messenger = new proton.Messenger(); /** * The correlator object is a mechanism used to correlate requests with their * aynchronous responses. It might possible be better to make use of Promises - * to implement part of this behaviour but a mechanism would still be meeded to + * to implement part of this behaviour but a mechanism would still be needed to * correlate a request with its response callback in order to wrap things up in * a Promise, so much of the behaviour of this object would still be required. * In addition it seemed to make sense to make this QMF2 implementation fairly * free of dependencies and using Promises would require external libraries. - * Instead the correlator implements Promise-like semantics, you might call it + * Instead the correlator implements "Promise-like" semantics, you might call it * a broken Promise :-) + * <p> * in particular the request method behaves a *bit* like Promise.all() though it * is mostly fake and takes an array of functions that call the add() method * which is really the method used to associate response objects by correlationID. @@ -155,6 +160,7 @@ var getObjects = function(packageName, className) { message.setReplyTo(replyTo); message.setCorrelationID(className); message.properties = { + "routing-key": "broker", // Added for Java Broker "x-amqp-0-10.app-id": "qmf2", "method": "request", "qmf.opcode": "_query_request", @@ -178,6 +184,7 @@ var invokeMethod = function(object, method, arguments) { message.setReplyTo(replyTo); message.setCorrelationID(correlationID); message.properties = { + "routing-key": "broker", // Added for Java Broker "x-amqp-0-10.app-id": "qmf2", "method": "request", "qmf.opcode": "_method_request", @@ -197,7 +204,7 @@ messenger.on('work', pumpData); messenger.setOutgoingWindow(1024); messenger.start(); -subscription = messenger.subscribe('amqp://0.0.0.0:5673/#'); +subscription = messenger.subscribe('amqp://' + addr + '/#'); messenger.recv(); // Receive as many messages as messenger can buffer. @@ -358,11 +365,24 @@ var _options = ' output\n'; var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true}; -var DEFAULT_PROPERTIES = {"exchange":["name", "type", "durable"], "queue":["name", "durable", "autoDelete"]}; +var DEFAULT_PROPERTIES = {"exchange": {"name": true, "type": true, "durable": true}, + "queue": {"name": true, "durable": true, "autoDelete": true}}; + +var getValue = function(r) { + var value = null; + if (r.length === 2) { + value = r[1]; + if (!isNaN(value)) { + value = parseInt(value); + } + } + + return value; +}; var config = { _recursive : false, - _host : 'localhost', + _host : 'localhost:5673', // Note 5673 not 5672 as we use WebSocket transport. _connTimeout : 10, _ignoreDefault : false, _altern_ex : null, @@ -389,17 +409,13 @@ var config = { _extra_arguments: [], _start_replica : null, _returnCode : 0, - _list_properties: [], + _list_properties: null, getOptions: function() { var options = {}; for (var a = 0; a < this._extra_arguments.length; a++) { var r = this._extra_arguments[a].split('='); - var value = null; - if (r.length === 2) { - value = r[1]; - } - options[r[0]] = value; + options[r[0]] = getValue(r); } return options; } @@ -471,7 +487,7 @@ var idMap = function(list) { return map; }; -var renderArguments = function(obj, list) { +var renderObject = function(obj, list) { if (!obj) { return ''; } @@ -493,7 +509,7 @@ var renderArguments = function(obj, list) { } if (addComma) { - return ' {' + string + '}'; + return '{' + string + '}'; } else { if (list) { return string; @@ -645,7 +661,7 @@ var exchangeListRecurse = function(filter) { var queue = queues[oid(bind.queueRef)]; var queueName = queue ? queue._values.name : "<unknown>"; console.log(" bind [" + bind.bindingKey + "] => " + queueName + - renderArguments(bind.arguments)); + " " + renderObject(bind.arguments)); } } } @@ -743,7 +759,7 @@ var queueList = function(filter) { if (args[SHARED_MSG_GROUP] === 1) { string += ' --shared-groups'; } - string += renderArguments(args, true); + string += ' ' + renderObject(args, true); console.log(string); } } @@ -785,7 +801,7 @@ var queueListRecurse = function(filter) { } console.log(" bind [" + bind.bindingKey + "] => " + exchangeName + - renderArguments(bind.arguments)); + " " + renderObject(bind.arguments)); } } } @@ -824,10 +840,10 @@ console.log("Method result"); if (response._arguments) { //console.log(response._arguments); } if (response._values) { - console.error("Exception from Agent: " + renderArguments(response._values)); + console.error("Exception from Agent: " + renderObject(response._values)); } // Mostly we want to stop the Messenger Event loop and exit when a QMF method - // call returns, but sometimes we don't. + // returns, but sometimes we don't, the dontStop flag prevents this behaviour. if (!dontStop) { messenger.stop(); } @@ -846,11 +862,7 @@ var addExchange = function(args) { for (var a = 0; a < config._extra_arguments.length; a++) { var r = config._extra_arguments[a].split('='); - var value = null; - if (r.length === 2) { - value = r[1]; - } - declArgs[r[0]] = value; + declArgs[r[0]] = getValue(r); } if (config._msgSequence) { @@ -918,11 +930,7 @@ var addQueue = function(args) { for (var a = 0; a < config._extra_arguments.length; a++) { var r = config._extra_arguments[a].split('='); - var value = null; - if (r.length === 2) { - value = r[1]; - } - declArgs[r[0]] = value; + declArgs[r[0]] = getValue(r); } if (config._durable) { @@ -1201,21 +1209,141 @@ console.log(args); * The following methods are "generic" create and delete methods to for arbitrary * Management Objects e.g. Incoming, Outgoing, Domain, Topic, QueuePolicy, * TopicPolicy etc. use --argument k1=v1 --argument k2=v2 --argument k3=v3 to - * pass arbitrary arguments as key/value pairs to the Object being created/deleted. + * pass arbitrary arguments as key/value pairs to the Object being created/deleted, + * for example to add a topic object that uses the fanout exchange: + * ./qpid-config.js add topic fanout --argument exchange=amq.fanout \ + * --argument qpid.max_size=1000000 --argument qpid.policy_type=ring */ var createObject = function(type, name, args) { -console.log("createObject"); -console.log(type); -console.log(name); -console.log(args); + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + // Create an object of the specified type. + invokeMethod(broker, 'create', { + "type": type, + "name": name, + "properties": args, + "strict": true}) + ).then(handleMethodResponse); + }); +}; +var deleteObject = function(type, name, args) { + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + // Create an object of the specified type and name. + invokeMethod(broker, 'delete', { + "type": type, + "name": name, + "options": args}) + ).then(handleMethodResponse); + }); }; -var deleteObject = function(args) { -console.log("deleteObject"); -console.log(args); +/** + * This is a "generic" mechanism for listing arbitrary Management Objects. + */ +var listObjects = function(type) { + correlator.request( + getObjects('org.apache.qpid.broker', type) + ).then(function(objects) { + // The correlator passes an object containing responses for all of the + // supplied requests so we index it by the supplied type to get our response. + objects = objects[type]; + + // Collect available attributes, stringify the values and compute the max + // length of the value of each attribute so that we can later create a table. + var attributes = {}; + var lengths = {}; + for (var i = 0; i < objects.length; i++) { + var object = objects[i]; + object = object._values; + for (var prop in object) { + if (typeof object[prop] === 'object') { // Stringify Object properties. + // Check if property is an ObjectID (reference property), + // if so replace with the "name" part of the OID. + if (object[prop]['_object_name']) { + var parts = object[prop]['_object_name'].split(':'); + object[prop] = parts[parts.length - 1]; + } else { + // Stringify general Object properties. + object[prop] = renderObject(object[prop]); + } + } else { + object[prop] = object[prop].toString(); // Stringify other property types. + } + + if (!lengths[prop] || object[prop].length > lengths[prop]) { // Compute lengths. + lengths[prop] = object[prop].length > prop.length ? object[prop].length : prop.length; + } + + if (!config._list_properties || config._list_properties[prop]) { // Do we want this property? + attributes[prop] = true; + } + } + } + if (!config._list_properties && DEFAULT_PROPERTIES[type]) { + attributes = DEFAULT_PROPERTIES[type]; + } + + // Using the information we've previously prepared now render a table + // showing the required property values. + var desired = []; + var header = ''; // Table header showing the property names. + if (attributes['name']) { + desired.push('name'); + delete attributes['name']; + header += 'name' + Array(lengths['name'] + 2 - 4).join(' '); + } + + for (var prop in attributes) { + desired.push(prop); + header += prop + Array(lengths[prop] + 2 - prop.length).join(' '); + } + + console.log("Objects of type '" + type + "'"); + console.log(header); + console.log(Array(header.length).join('=')); + for (var i = 0; i < objects.length; i++) { + var object = objects[i]; + object = object._values; + var string = ''; + for (var j = 0; j < desired.length; j++) { + var key = desired[j]; + string += object[key] + Array(lengths[key] + 2 - object[key].length).join(' '); + } + + console.log(string); + } + + messenger.stop(); + }); +}; + +var reloadAcl = function() { + correlator.request( + getObjects('org.apache.qpid.acl', 'acl') + ).then(function(objects) { + if (objects.acl.length > 0) { + var acl = objects.acl[0]; + correlator.request( + // Create an object of the specified type. + invokeMethod(acl, 'reloadACLFile', {}) + ).then(handleMethodResponse); + } else { + console.log("Failed: No ACL Loaded in Broker"); + messenger.stop(); + } + }); }; @@ -1263,10 +1391,9 @@ if (args.length > 0) { if (config._connTimeout === 0) { config._connTimeout = null; } - } else if (arg === '-b' || arg === '--broker' || arg === '-b' || arg === '--broker-addr') { - config._host = val; - if (config._host == null) { - config._host = 'localhost:5672'; + } else if (arg === '-b' || arg === '--broker' || arg === '-a' || arg === '--broker-addr') { + if (val != null) { + config._host = val; } } else if (arg === '--alternate-exchange') { config._altern_ex = val; @@ -1306,7 +1433,10 @@ if (args.length > 0) { } else if (arg === '--f' || arg === '--file') { // TODO Won't work in node.js config._file = val; } else if (arg === '--show-property') { - config._list_properties = val; + if (config._list_properties === null) { + config._list_properties = {}; + } + config._list_properties[val] = true; } } else { params.push(arg); @@ -1316,9 +1446,6 @@ if (args.length > 0) { config._extra_arguments = extra_arguments; -console.log("params"); -console.log(params); - // The command only *actually* gets called when the QMF connection has actually // been established so we wrap up the function we want to get called in a lambda. var command = function() {overview();}; @@ -1365,11 +1492,19 @@ if (params.length > 0) { command = function() {bind(Array.prototype.slice.apply(params, [1]));}; } else if (cmd === 'unbind') { command = function() {unbind(Array.prototype.slice.apply(params, [1]));}; + } else if (cmd === 'reload-acl') { + command = function() {reloadAcl();}; + } else if (cmd === 'list' && params.length > 1) { + command = function() {listObjects(modifier);}; + } else { + usage(); } } +//console.log(config._host); + + var onSubscription = function() { command(); }; - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/send.html ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/send.html b/examples/messenger/javascript/send.html new file mode 100644 index 0000000..b6aaef2 --- /dev/null +++ b/examples/messenger/javascript/send.html @@ -0,0 +1,110 @@ +<!DOCTYPE html> <!-- HTML5 doctype --> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<html> + +<head> + <title>Simple Proton Messenger Send Example</title> + <meta http-equiv="content-type" content="text/html;charset=utf-8" /> + +<!-- + Import JavaScript Messenger Binding proton.js. Note that this simple example pulls + it from the node_modules/qpid-proton/lib which is created by the build process + so that the node.js based examples "just work", in a real Web App you would need + to copy the proton.js to your own server. + In actual fact the CMake build actually builds proton.js into the directory: + <build>/proton-c/bindings/javascript + where <build> is the build directory created to run cmake from. +--> +<script type="text/javascript" src="../../../node_modules/qpid-proton/lib/proton.js"></script> + +<script type="text/javascript"> + +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +var sendMessage = function() { + var address = document.getElementById("address").value; + var subject = document.getElementById("subject").value; + var body = document.getElementById("body").value; + +console.log("sendMessage"); +console.log("address = " + address); +console.log("subject = " + subject); +console.log("body = " + body); + + message.setAddress(address); + message.setSubject(subject); + message.body = body; + + messenger.put(message); +}; + +messenger.on('error', function(error) {console.log("Received error " + error);}); +messenger.start(); + +</script> + +<style> +body +{ + font: 13px/1.5 Helvetica, Arial, 'Liberation Sans', FreeSans, sans-serif; + overflow-x: hidden; /* Hide horizontal scrollbar */ + background: #dddddd; +} + +label +{ + display: block; + font-size: 17px; +} + +input, textarea +{ + font-size: 13px; + margin-bottom: 10px; +} +</style> + +</head> + +<body> +<div> + <label for="address">Address:</label> + <input type="text" id="address" size="40" + placeholder="amqp://user:password@host:port" + name="address" value="amqp://guest:[email protected]" /> +</div> +<div> + <label for="subject">Subject:</label> + <input type="text" id="subject" size="40" + name="subject" value="Browser Message" /> +</div> +<div> + <label for="body">Message:</label> + <textarea id="body" name="body" rows="4" cols="40">Hello From Browser!</textarea> +</div> +<div> + <input type="button" value="send" onclick="sendMessage()"/> +</div> +</body> + +</html> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/examples/messenger/javascript/send.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/send.js b/examples/messenger/javascript/send.js index 77a605c..5a93333 100644 --- a/examples/messenger/javascript/send.js +++ b/examples/messenger/javascript/send.js @@ -33,6 +33,10 @@ var running = true; var message = new proton.Message(); var messenger = new proton.Messenger(); +// Because this is an asynchronous send we can't simply call messenger.put(message) +// then exit. The following callback function (and messenger.setOutgoingWindow()) +// gives us a means to wait until the consumer has received the message before +// exiting. The recv.js example explicitly accepts messages it receives. var pumpData = function() { var status = messenger.status(tracker); if (status != proton.Status.PENDING) { @@ -81,7 +85,7 @@ console.log("Content: " + msgtext); messenger.on('error', function(error) {console.log(error);}); messenger.on('work', pumpData); -messenger.setOutgoingWindow(1024); +messenger.setOutgoingWindow(1024); // So we can track status of send message. messenger.start(); message.setAddress(address); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt index 4e4dc0f..ae24407 100644 --- a/proton-c/bindings/javascript/CMakeLists.txt +++ b/proton-c/bindings/javascript/CMakeLists.txt @@ -19,7 +19,7 @@ # This file allows cross-compiling of proton to JavaScript using emscripten # (https://github.com/kripken/emscripten). As it is really a cross-compilation -# (as opposed to a binding a la swig) the approach is rather different and +# (as opposed to a binding like with swig) the approach is rather different and # somewhat replicates the main build in the proton-c/CMakeLists.txt using quite # a bit of "copy and paste reuse". # TODO refactor this file (and proton-c/CMakeLists.txt) to keep the main @@ -27,8 +27,8 @@ message(STATUS "Found emscripten, using that to build JavaScript binding") -# Find and install node.js packages that we might need. We can assume that -# node.js is installed because Emscripten has a dependency on it. +# Find and install the node.js packages that we might need. We can assume that +# node.js itself is installed because Emscripten has a dependency on it. find_package(NodePackages) # Describe the target OS we are building to - Emscripten mimics the Linux platform. @@ -43,6 +43,14 @@ set(CMAKE_C_COMPILER "${EMCC}") include(CMakeForceCompiler) CMAKE_FORCE_C_COMPILER("${CMAKE_C_COMPILER}" Clang) +if (CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo") + message(STATUS "DEBUG JavaScript build") +else() + message(STATUS "RELEASE JavaScript build") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") + set(EMSCRIPTEN_LINK_OPTIMISATIONS "-O2 --closure 1") +endif() + # From this point we should be using emscripten compilation tools. message(STATUS "emscripten compilation environment:") message(STATUS "CMAKE_C_COMPILER: ${CMAKE_C_COMPILER}") @@ -193,21 +201,9 @@ set_target_properties( RUNTIME_OUTPUT_DIRECTORY examples DEPENDS ws - # This build shows socket messages - useful for debugging. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s SOCKET_DEBUG=1" - - # Optimised build - takes somewhat longer to build. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1" - - # This build shows up emscripten warnings when building - should be able to remove it. - #LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -s VERBOSE=1 -O2" - - # This build is optimised but not minified - LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2" + LINK_FLAGS "-s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -${EMSCRIPTEN_LINK_OPTIMISATIONS}" ) - - # Build the main JavaScript library called proton.js add_executable(proton.js binding.c) target_link_libraries(proton.js qpid-proton-bitcode) @@ -217,16 +213,13 @@ set_target_properties( PROPERTIES COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS}" - # This build is optimised and minified. The --memory-init-file 0 stops emscripten - # emitting a separate memory initialization file, if this was enabled it makes - # packaging harder as applications would expect proton.js.mem to be served too. - # It's even more fiddly with node.js packages. This behaviour might be reinstated - # if the packaging mechanism improves. - - # --js-library ${CMAKE_CURRENT_SOURCE_DIR}/my-library.js - #LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js + # The --memory-init-file 0 stops emscripten emitting a separate memory + # initialization file, if this was enabled it makes packaging harder as + # applications would expect proton.js.mem to be served too. It's even more + # fiddly with node.js packages. This behaviour might be reinstated if the + # packaging mechanism improves. - LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" -O2 --closure 1 --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_test', '_uuid_generate', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', ' _pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_messa ge_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_deci mal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\"" + LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messen ger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_rep ly_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data _put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_ pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump']\"" ) # This command packages up the compiled proton.js into a node.js package called http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/binding.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/binding.js b/proton-c/bindings/javascript/binding.js index eceda54..4caafa6 100644 --- a/proton-c/bindings/javascript/binding.js +++ b/proton-c/bindings/javascript/binding.js @@ -477,6 +477,14 @@ _Messenger_['setIncomingWindow'] = function(window) { */ _Messenger_['start'] = function() { this._check(_pn_messenger_start(this._messenger)); + + // This call ensures that the emscripten network callback functions are set + // up even if a client hasn't explicity added a work function via a call to + // messenger.on('work', <work function>); + // Doing this means that pn_messenger_work() will still get called when any + // WebSocket events occur, which keeps things more reliable when things like + // reconnections occur. + Module.EventDispatch.addListener(this); }; /** @@ -541,8 +549,7 @@ _Messenger_['subscribe'] = function(source) { * Places the content contained in the message onto the outgoing queue * of the Messenger. This method will never block, however it will send any * unblocked Messages in the outgoing queue immediately and leave any blocked - * Messages remaining in the outgoing queue. The send call may be used to - * block until the outgoing queue is empty. The outgoing property may be + * Messages remaining in the outgoing queue. The outgoing property may be * used to check the depth of the outgoing queue. * <p> * When the content in a given Message object is copied to the outgoing @@ -554,13 +561,23 @@ _Messenger_['subscribe'] = function(source) { * @method put * @memberof! proton.Messenger# * @param {proton.Message} message a Message to send. + * @param {boolean} flush if this is set true or is undefined then messages are + * flushed (this is the default). If explicitly set to false then messages + * may not be sent immediately and might require an explicit call to work(). + * This may be used to "batch up" messages and *may* be more efficient. * @returns {proton.Data.Long} a tracker. */ -_Messenger_['put'] = function(message) { +_Messenger_['put'] = function(message, flush) { + flush = flush === false ? false : true; message._preEncode(); this._checkErrors = true; this._check(_pn_messenger_put(this._messenger, message._message)); + // If flush is set invoke pn_messenger_work. + if (flush) { + _pn_messenger_work(this._messenger, 0); + } + // Getting the tracker is a little tricky as it is a 64 bit number. The way // emscripten handles this is to return the low 32 bits directly and pass // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the @@ -578,6 +595,12 @@ _Messenger_['put'] = function(message) { * @returns {proton.Status} one of None, PENDING, REJECTED, or ACCEPTED. */ _Messenger_['status'] = function(tracker) { + if (tracker == null) { + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + } + return _pn_messenger_status(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits()); }; @@ -589,6 +612,12 @@ _Messenger_['status'] = function(tracker) { * @returns {boolean} true if delivery is still buffered. */ _Messenger_['isBuffered'] = function(tracker) { + if (tracker == null) { + var low = _pn_messenger_outgoing_tracker(this._messenger); + var high = Runtime.getTempRet0(); + tracker = new Data.Long(low, high); + } + return (_pn_messenger_buffered(this._messenger, tracker.getLowBitsUnsigned(), tracker.getHighBits()) > 0); }; @@ -982,7 +1011,9 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl }; } - _messengers[name].callbacks.push(callback); + if (callback) { + _messengers[name].callbacks.push(callback); + } }; /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/proton-c/bindings/javascript/my-library.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/my-library.js b/proton-c/bindings/javascript/my-library.js deleted file mode 100644 index af89ef4..0000000 --- a/proton-c/bindings/javascript/my-library.js +++ /dev/null @@ -1,755 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -mergeInto(LibraryManager.library, { -// Add to main emscripten library.js - - -// Hacks below -// ----------------------------------------------------------------------------------------------------------------- - - $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', - $SOCKFS__deps: ['$FS'], - $SOCKFS: { - mount: function(mount) { - // If Module['websocket'] has already been defined (e.g. for configuring - // subprotocol/url) use that, if not initialise it to a new object. - Module['websocket'] = (Module['websocket'] && - ('object' === typeof Module['websocket'])) ? Module['websocket'] : {}; - - // Add Event registration mechanism to the exported websocket configuration - // object so we can register network callbacks from native JavaScript too. - Module['websocket']._callbacks = {}; - Module['websocket'].on = function(event, callback) { - if ('function' === typeof callback) { - this._callbacks[event] = callback; - } - return this; - }; - - Module['websocket'].emit = function(event, param) { - if ('function' === typeof this._callbacks[event]) { - this._callbacks[event].call(this, param); - } - }; - - // Register default null callbacks for each Event - Module['websocket'].on("error", function(error) { -console.log("Websocket error " + error); - }); - - Module['websocket'].on("open", function(fd) { -console.log("Websocket open fd = " + fd); - }); - - Module['websocket'].on("connection", function(fd) { -console.log("Websocket connection fd = " + fd); - }); - - Module['websocket'].on("message", function(fd) { -console.log("Websocket message fd = " + fd); - }); - - Module['websocket'].on("close", function(fd) { -console.log("Websocket close fd = " + fd); - }); - - return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); - }, - createSocket: function(family, type, protocol) { - var streaming = type == {{{ cDefine('SOCK_STREAM') }}}; - if (protocol) { - assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp - } - - // create our internal socket structure - var sock = { - family: family, - type: type, - protocol: protocol, - server: null, - peers: {}, - pending: [], - recv_queue: [], -#if SOCKET_WEBRTC -#else - sock_ops: SOCKFS.websocket_sock_ops -#endif - }; - - // create the filesystem node to store the socket structure - var name = SOCKFS.nextname(); - var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0); - node.sock = sock; - - // and the wrapping stream that enables library functions such - // as read and write to indirectly interact with the socket - var stream = FS.createStream({ - path: name, - node: node, - flags: FS.modeStringToFlags('r+'), - seekable: false, - stream_ops: SOCKFS.stream_ops - }); - - // map the new stream to the socket structure (sockets have a 1:1 - // relationship with a stream) - sock.stream = stream; - - return sock; - }, - getSocket: function(fd) { - var stream = FS.getStream(fd); - if (!stream || !FS.isSocket(stream.node.mode)) { - return null; - } - return stream.node.sock; - }, - // node and stream ops are backend agnostic - stream_ops: { - poll: function(stream) { - var sock = stream.node.sock; - return sock.sock_ops.poll(sock); - }, - ioctl: function(stream, request, varargs) { -console.log('stream_ops.ioctl'); - var sock = stream.node.sock; - return sock.sock_ops.ioctl(sock, request, varargs); - }, - read: function(stream, buffer, offset, length, position /* ignored */) { - var sock = stream.node.sock; - var msg = sock.sock_ops.recvmsg(sock, length); - if (!msg) { - // socket is closed - return 0; - } -#if USE_TYPED_ARRAYS == 2 - buffer.set(msg.buffer, offset); -#else - for (var i = 0; i < size; i++) { - buffer[offset + i] = msg.buffer[i]; - } -#endif - return msg.buffer.length; - }, - write: function(stream, buffer, offset, length, position /* ignored */) { - var sock = stream.node.sock; - return sock.sock_ops.sendmsg(sock, buffer, offset, length); - }, - close: function(stream) { -console.log('stream_ops.close'); - var sock = stream.node.sock; - sock.sock_ops.close(sock); - } - }, - nextname: function() { - if (!SOCKFS.nextname.current) { - SOCKFS.nextname.current = 0; - } - return 'socket[' + (SOCKFS.nextname.current++) + ']'; - }, - // backend-specific stream ops - websocket_sock_ops: { - // - // peers are a small wrapper around a WebSocket to help in - // emulating dgram sockets - // - // these functions aren't actually sock_ops members, but we're - // abusing the namespace to organize them - // - createPeer: function(sock, addr, port) { - var ws; - - if (typeof addr === 'object') { - ws = addr; - addr = null; - port = null; - } - - if (ws) { - // for sockets that've already connected (e.g. we're the server) - // we can inspect the _socket property for the address - if (ws._socket) { - addr = ws._socket.remoteAddress; - port = ws._socket.remotePort; - } - // if we're just now initializing a connection to the remote, - // inspect the url property - else { - var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url); - if (!result) { - throw new Error('WebSocket URL must be in the format ws(s)://address:port'); - } - addr = result[1]; - port = parseInt(result[2], 10); - } - } else { - // Create the actual websocket object and connect. - try { - // runtimeConfig gets set to true if WebSocket runtime configuration is available. - var runtimeConfig = (Module['websocket'] && ('object' === typeof Module['websocket'])); - - // The default value is 'ws://' the replace is needed because the compiler replaces "//" comments with '#' - // comments without checking context, so we'd end up with ws:#, the replace swaps the "#" for "//" again. - var url = '{{{ WEBSOCKET_URL }}}'.replace('#', '//'); - - if (runtimeConfig) { - if ('string' === typeof Module['websocket']['url']) { - url = Module['websocket']['url']; // Fetch runtime WebSocket URL config. - } - } - - if (url === 'ws://' || url === 'wss://') { // Is the supplied URL config just a prefix, if so complete it. - url = url + addr + ':' + port; - } - - // Make the WebSocket subprotocol (Sec-WebSocket-Protocol) default to binary if no configuration is set. - var subProtocols = '{{{ WEBSOCKET_SUBPROTOCOL }}}'; // The default value is 'binary' - - if (runtimeConfig) { - if ('string' === typeof Module['websocket']['subprotocol']) { - subProtocols = Module['websocket']['subprotocol']; // Fetch runtime WebSocket subprotocol config. - } - } - - // The regex trims the string (removes spaces at the beginning and end, then splits the string by - // <any space>,<any space> into an Array. Whitespace removal is important for Websockify and ws. - subProtocols = subProtocols.replace(/^ +| +$/g,"").split(/ *, */); - - // The node ws library API for specifying optional subprotocol is slightly different than the browser's. - var opts = ENVIRONMENT_IS_NODE ? {'protocol': subProtocols.toString()} : subProtocols; - -#if SOCKET_DEBUG - Module.print('connect: ' + url + ', ' + subProtocols.toString()); -#endif - // If node we use the ws library. - var WebSocket = ENVIRONMENT_IS_NODE ? require('ws') : window['WebSocket']; - ws = new WebSocket(url, opts); - ws.binaryType = 'arraybuffer'; - } catch (e) { -console.log('e: ' + e); - throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH); - } - } - -#if SOCKET_DEBUG - Module.print('websocket adding peer: ' + addr + ':' + port); -#endif - - var peer = { - addr: addr, - port: port, - socket: ws, - dgram_send_queue: [] - }; - - SOCKFS.websocket_sock_ops.addPeer(sock, peer); - SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer); - - // if this is a bound dgram socket, send the port number first to allow - // us to override the ephemeral port reported to us by remotePort on the - // remote end. - if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') { -#if SOCKET_DEBUG - Module.print('websocket queuing port message (port ' + sock.sport + ')'); -#endif - peer.dgram_send_queue.push(new Uint8Array([ - 255, 255, 255, 255, - 'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0), - ((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff) - ])); - } - - return peer; - }, - getPeer: function(sock, addr, port) { - return sock.peers[addr + ':' + port]; - }, - addPeer: function(sock, peer) { - sock.peers[peer.addr + ':' + peer.port] = peer; - }, - removePeer: function(sock, peer) { - delete sock.peers[peer.addr + ':' + peer.port]; - }, - handlePeerEvents: function(sock, peer) { - var first = true; - - var handleOpen = function () { -#if SOCKET_DEBUG - Module.print('websocket handle open'); -#endif - try { - var queued = peer.dgram_send_queue.shift(); - while (queued) { -#if SOCKET_DEBUG - Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]); -#endif - peer.socket.send(queued); - queued = peer.dgram_send_queue.shift(); - } - } catch (e) { - // not much we can do here in the way of proper error handling as we've already - // lied and said this data was sent. shut it down. - peer.socket.close(); - } - - - - if (Module['networkCallback']) { -console.log("handleOpen triggering networkCallback"); - - Module['networkCallback'](); - } - - Module['websocket'].emit('open', 10); - - }; - - function handleMessage(data) { - assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer - data = new Uint8Array(data); // make a typed array view on the array buffer - -#if SOCKET_DEBUG - //Module.print('websocket handle message (' + data.byteLength + ' bytes): ' + [Array.prototype.slice.call(data)]); - //Module.print('websocket handle message (' + data.byteLength + ' bytes)'); -#endif - - // if this is the port message, override the peer's port with it - var wasfirst = first; - first = false; - if (wasfirst && - data.length === 10 && - data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 && - data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) { - // update the peer's port and it's key in the peer map - var newport = ((data[8] << 8) | data[9]); - SOCKFS.websocket_sock_ops.removePeer(sock, peer); - peer.port = newport; - SOCKFS.websocket_sock_ops.addPeer(sock, peer); - return; - } - - sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); - - - - if (Module['networkCallback']) { -console.log("handleMessage triggering networkCallback"); - - Module['networkCallback'](); - } - - Module['websocket'].emit('message', 10); - - - }; - - if (ENVIRONMENT_IS_NODE) { - peer.socket.on('open', handleOpen); - peer.socket.on('message', function(data, flags) { - if (!flags.binary) { - return; - } - handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer - }); - peer.socket.on('close', function() { - Module['websocket'].emit('close', 10); - }); - peer.socket.on('error', function(error) { - Module['websocket'].emit('error', error); - // don't throw - }); - } else { - peer.socket.onopen = handleOpen; - peer.socket.onclose = function() { - Module['websocket'].emit('close', 10); - }; - peer.socket.onmessage = function peer_socket_onmessage(event) { - handleMessage(event.data); - }; - peer.socket.onerror = function(error) { - Module['websocket'].emit('error', error); - }; - } - }, - - // - // actual sock ops - // - poll: function(sock) { - if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { - // listen sockets should only say they're available for reading - // if there are pending clients. - return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0; - } - - var mask = 0; - var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets - SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) : - null; - - if (sock.recv_queue.length || - !dest || // connection-less sockets are always ready to read - (dest && dest.socket.readyState === dest.socket.CLOSING) || - (dest && dest.socket.readyState === dest.socket.CLOSED)) { // let recv return 0 once closed - mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}); - } - - if (!dest || // connection-less sockets are always ready to write - (dest && dest.socket.readyState === dest.socket.OPEN)) { - mask |= {{{ cDefine('POLLOUT') }}}; - } - - if ((dest && dest.socket.readyState === dest.socket.CLOSING) || - (dest && dest.socket.readyState === dest.socket.CLOSED)) { - mask |= {{{ cDefine('POLLHUP') }}}; - } - return mask; - }, - ioctl: function(sock, request, arg) { -console.log('ioctl'); - switch (request) { - case {{{ cDefine('FIONREAD') }}}: - var bytes = 0; - if (sock.recv_queue.length) { - bytes = sock.recv_queue[0].data.length; - } - {{{ makeSetValue('arg', '0', 'bytes', 'i32') }}}; - return 0; - default: - return ERRNO_CODES.EINVAL; - } - }, - close: function(sock) { -console.log('close'); - // if we've spawned a listen server, close it - if (sock.server) { - try { - sock.server.close(); - } catch (e) { - } - sock.server = null; - } - // close any peer connections - var peers = Object.keys(sock.peers); - for (var i = 0; i < peers.length; i++) { - var peer = sock.peers[peers[i]]; - try { - peer.socket.close(); - } catch (e) { - } - SOCKFS.websocket_sock_ops.removePeer(sock, peer); - } - return 0; - }, - bind: function(sock, addr, port) { - if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') { - throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound - } - sock.saddr = addr; - sock.sport = port || _mkport(); - // in order to emulate dgram sockets, we need to launch a listen server when - // binding on a connection-less socket - // note: this is only required on the server side - if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { - // close the existing server if it exists - if (sock.server) { - sock.server.close(); - sock.server = null; - } - // swallow error operation not supported error that occurs when binding in the - // browser where this isn't supported - try { - sock.sock_ops.listen(sock, 0); - } catch (e) { - if (!(e instanceof FS.ErrnoError)) throw e; - if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e; - } - } - }, - connect: function(sock, addr, port) { - if (sock.server) { - throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP); - } - - // TODO autobind - // if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) { - // } - - // early out if we're already connected / in the middle of connecting - if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') { - var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); - if (dest) { - if (dest.socket.readyState === dest.socket.CONNECTING) { - throw new FS.ErrnoError(ERRNO_CODES.EALREADY); - } else { - throw new FS.ErrnoError(ERRNO_CODES.EISCONN); - } - } - } - - // add the socket to our peer list and set our - // destination address / port to match - var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); - sock.daddr = peer.addr; - sock.dport = peer.port; - - // always "fail" in non-blocking mode - throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS); - }, - listen: function(sock, backlog) { - if (!ENVIRONMENT_IS_NODE) { - throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP); - } - if (sock.server) { - throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening - } - - var WebSocketServer = require('ws').Server; - var host = sock.saddr; -#if SOCKET_DEBUG - console.log('listen: ' + host + ':' + sock.sport); -#endif - sock.server = new WebSocketServer({ - host: host, - port: sock.sport - // TODO support backlog - }); - - sock.server.on('connection', function(ws) { -#if SOCKET_DEBUG - console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort); -#endif - if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { - var newsock = SOCKFS.createSocket(sock.family, sock.type, sock.protocol); - - // create a peer on the new socket - var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, ws); - newsock.daddr = peer.addr; - newsock.dport = peer.port; - - // push to queue for accept to pick up - sock.pending.push(newsock); - } else { - // create a peer on the listen socket so calling sendto - // with the listen socket and an address will resolve - // to the correct client - SOCKFS.websocket_sock_ops.createPeer(sock, ws); - } - - if (Module['networkCallback']) { -console.log("On connection triggering networkCallback"); - - Module['networkCallback'](); - } - - Module['websocket'].emit('connection', 10); - - - }); - sock.server.on('closed', function() { -console.log('sock.server closed'); - Module['websocket'].emit('close', 10); - sock.server = null; - }); - sock.server.on('error', function(error) { -console.log('sock.server error'); - Module['websocket'].emit('error', error); - // don't throw - }); - }, - accept: function(listensock) { - if (!listensock.server) { - throw new FS.ErrnoError(ERRNO_CODES.EINVAL); - } - - var newsock = listensock.pending.shift(); - newsock.stream.flags = listensock.stream.flags; - return newsock; - }, - getname: function(sock, peer) { -console.log('getname'); - var addr, port; - if (peer) { - if (sock.daddr === undefined || sock.dport === undefined) { - throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); - } - addr = sock.daddr; - port = sock.dport; - } else { - // TODO saddr and sport will be set for bind()'d UDP sockets, but what - // should we be returning for TCP sockets that've been connect()'d? - addr = sock.saddr || 0; - port = sock.sport || 0; - } - return { addr: addr, port: port }; - }, - sendmsg: function(sock, buffer, offset, length, addr, port) { - if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { - // connection-less sockets will honor the message address, - // and otherwise fall back to the bound destination address - if (addr === undefined || port === undefined) { - addr = sock.daddr; - port = sock.dport; - } - // if there was no address to fall back to, error out - if (addr === undefined || port === undefined) { - throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ); - } - } else { - // connection-based sockets will only use the bound - addr = sock.daddr; - port = sock.dport; - } - - // find the peer for the destination address - var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port); - - // early out if not connected with a connection-based socket - if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { - if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) { - throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); - } else if (dest.socket.readyState === dest.socket.CONNECTING) { - throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); - } - } - - // create a copy of the incoming data to send, as the WebSocket API - // doesn't work entirely with an ArrayBufferView, it'll just send - // the entire underlying buffer - var data; - if (buffer instanceof Array || buffer instanceof ArrayBuffer) { - data = buffer.slice(offset, offset + length); - } else { // ArrayBufferView - data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length); - } - - // if we're emulating a connection-less dgram socket and don't have - // a cached connection, queue the buffer to send upon connect and - // lie, saying the data was sent now. - if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { - if (!dest || dest.socket.readyState !== dest.socket.OPEN) { - // if we're not connected, open a new connection - if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) { - dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); - } -#if SOCKET_DEBUG - Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); -#endif - dest.dgram_send_queue.push(data); - return length; - } - } - - try { -#if SOCKET_DEBUG - Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); -#endif - - // send the actual data - dest.socket.send(data); - - return length; - } catch (e) { - throw new FS.ErrnoError(ERRNO_CODES.EINVAL); - } - }, - recvmsg: function(sock, length) { - // http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html - if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { - // tcp servers should not be recv()'ing on the listen socket - throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); - } - - var queued = sock.recv_queue.shift(); - if (!queued) { - if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { - var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); - - if (!dest) { - // if we have a destination address but are not connected, error out - throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); - } - else if (dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) { - // return null if the socket has closed - return null; - } - else { - // else, our socket is in a valid state but truly has nothing available - throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); - } - } else { - throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); - } - } - - // queued.data will be an ArrayBuffer if it's unadulterated, but if it's - // requeued TCP data it'll be an ArrayBufferView - var queuedLength = queued.data.byteLength || queued.data.length; - var queuedOffset = queued.data.byteOffset || 0; - var queuedBuffer = queued.data.buffer || queued.data; - var bytesRead = Math.min(length, queuedLength); - var res = { - buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead), - addr: queued.addr, - port: queued.port - }; - -#if SOCKET_DEBUG - Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]); -#endif - - // push back any unread data for TCP connections - if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) { - var bytesRemaining = queuedLength - bytesRead; -#if SOCKET_DEBUG - Module.print('websocket read: put back ' + bytesRemaining + ' bytes'); -#endif - queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining); - sock.recv_queue.unshift(queued); - } - - return res; - } - } - }, - - emscripten_set_network_callback: function(func) { - - function _func() { - try { - Runtime.dynCall('v', func); - } catch (e) { - if (e instanceof ExitStatus) { - return; - } else { - if (e && typeof e === 'object' && e.stack) Module.printErr('exception thrown: ' + [e, e.stack]); - throw e; - } - } - }; - - Module['noExitRuntime'] = true; - Module['networkCallback'] = _func; - } - -}); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a78327f/tests/javascript/soak.js ---------------------------------------------------------------------- diff --git a/tests/javascript/soak.js b/tests/javascript/soak.js new file mode 100755 index 0000000..c561989 --- /dev/null +++ b/tests/javascript/soak.js @@ -0,0 +1,99 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var addr = 'guest:guest@localhost:5673'; +//var addr = 'localhost:5673'; +var address = 'amqp://' + addr; +console.log(address); + +var subscriptionQueue = ''; +var subscription; +var subscribed = false; +var count = 0; +var start = 0; // Start Time. + +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +var pumpData = function() { + if (!subscribed) { + var subscriptionAddress = subscription.getAddress(); + if (subscriptionAddress) { + subscribed = true; + var splitAddress = subscriptionAddress.split('/'); + subscriptionQueue = splitAddress[splitAddress.length - 1]; + onSubscription(); + } + } + + while (messenger.incoming()) { + // The second parameter forces Binary payloads to be decoded as strings + // this is useful because the broker QMF Agent encodes strings as AMQP + // binary, which is a right pain from an interoperability perspective. + var t = messenger.get(message, true); + //console.log("Address: " + message.getAddress()); + //console.log("Content: " + message.body); + messenger.accept(t); + + if (count % 1000 === 0) { + var time = +new Date(); + console.log("count = " + count + ", duration = " + (time - start) + ", rate = " + ((count*1000)/(time - start))); + } + + sendMessage(); + } + + if (messenger.isStopped()) { + message.free(); + messenger.free(); + } +}; + +var sendMessage = function() { + var msgtext = "Message Number " + count; + count++; + + message.setAddress(address + '/' + subscriptionQueue); + message.body = msgtext; + messenger.put(message); +//messenger.settle(); +}; + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +//messenger.setOutgoingWindow(1024); +messenger.setIncomingWindow(1024); // The Java Broker seems to need this. +messenger.start(); + +subscription = messenger.subscribe('amqp://' + addr + '/#'); +messenger.recv(); // Receive as many messages as messenger can buffer. + +var onSubscription = function() { + console.log("Subscription Queue: " + subscriptionQueue); + start = +new Date(); + sendMessage(); +}; + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
