JavaScript binding now pretty much in a releasable state. Needs a little bit of tidying up, a few more tests and examples, but it now has a more or less complete implementation of qpid-config to test interoperability with C++ Maps and Lists. Proton-c compiles cleanly now using emscripten as I have made all the necessary changes to emscripten itself, so all the Proton JavaScript code is what might be considered a wrapper around proton-c, it is very similar indeed to the Python binding. Needs some performance tests added and profiling, but it functionally covers pretty much all of Messenger, Message and Codec
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1616550 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a61e5f9c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a61e5f9c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a61e5f9c Branch: refs/heads/master Commit: a61e5f9c35b530097b808f27424781271495e9f9 Parents: 6aed854 Author: fadams <fadams@unknown> Authored: Thu Aug 7 17:20:37 2014 +0000 Committer: fadams <fadams@unknown> Committed: Thu Aug 7 17:20:37 2014 +0000 ---------------------------------------------------------------------- examples/messenger/c/recv-async.c | 246 ++-- examples/messenger/c/send-async.c | 183 ++- examples/messenger/javascript/client.js | 102 ++ examples/messenger/javascript/drain.js | 58 +- examples/messenger/javascript/proxy.js | 100 ++ examples/messenger/javascript/qpid-config.js | 1375 +++++++++++++++++++++ examples/messenger/javascript/recv.js | 67 + examples/messenger/javascript/send.js | 92 ++ examples/messenger/javascript/server.js | 79 ++ examples/messenger/javascript/spout.js | 144 ++- examples/messenger/javascript/ws2tcp.js | 162 +++ proton-c/bindings/javascript/CMakeLists.txt | 22 +- proton-c/bindings/javascript/TODO | 24 +- proton-c/bindings/javascript/binding.c | 78 +- proton-c/bindings/javascript/binding.js | 1067 ++++++++++++++-- proton-c/bindings/javascript/my-library.js | 68 +- tests/javascript/codec.js | 2 +- tests/javascript/message.js | 301 +++++ 18 files changed, 3619 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/c/recv-async.c ---------------------------------------------------------------------- diff --git a/examples/messenger/c/recv-async.c b/examples/messenger/c/recv-async.c index cb50635..1f49166 100644 --- a/examples/messenger/c/recv-async.c +++ b/examples/messenger/c/recv-async.c @@ -18,6 +18,8 @@ * */ +// This is a re-implementation of recv.c using non-blocking/asynchronous calls. + #include "proton/message.h" #include "proton/messenger.h" @@ -28,9 +30,11 @@ #if EMSCRIPTEN #include <emscripten.h> -void emscripten_set_network_callback(void (*func)()); #endif +pn_message_t * message; +pn_messenger_t * messenger; + #define check(messenger) \ { \ if(pn_messenger_errno(messenger)) \ @@ -39,171 +43,151 @@ void emscripten_set_network_callback(void (*func)()); } \ } \ -// FA Temporarily make these global - pn_message_t * message; - pn_messenger_t * messenger; - void die(const char *file, int line, const char *message) { - fprintf(stderr, "%s:%i: %s\n", file, line, message); - exit(1); + fprintf(stderr, "%s:%i: %s\n", file, line, message); + exit(1); } void usage(void) { - printf("Usage: recv [options] <addr>\n"); - printf("-c \tPath to the certificate file.\n"); - printf("-k \tPath to the private key file.\n"); - printf("-p \tPassword for the private key.\n"); - printf("<addr>\tAn address.\n"); - exit(0); + printf("Usage: recv [options] <addr>\n"); + printf("-c \tPath to the certificate file.\n"); + printf("-k \tPath to the private key file.\n"); + printf("-p \tPassword for the private key.\n"); + printf("<addr>\tAn address.\n"); + exit(0); } void process(void) { -//printf(" *** process ***\n"); - - // Process incoming messages - while(pn_messenger_incoming(messenger)) { -printf("in while loop\n"); - - pn_messenger_get(messenger, message); - check(messenger); - pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger); -printf("tracker = %ld:%ld\n", (long)(tracker >> 32), (long)tracker); - - char buffer[1024]; - size_t buffsize = sizeof(buffer); - pn_data_t *body = pn_message_body(message); - pn_data_format(body, buffer, &buffsize); - - printf("Address: %s\n", pn_message_get_address(message)); - const char* subject = pn_message_get_subject(message); - printf("Subject: %s\n", subject ? subject : "(no subject)"); - printf("Content: %s\n", buffer); - - - - int err = pn_messenger_accept(messenger, tracker, 0); -printf("err = %d\n\n", err); + pn_messenger_get(messenger, message); + check(messenger); + + { + pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger); + char buffer[1024]; + size_t buffsize = sizeof(buffer); + const char* subject = pn_message_get_subject(message); + pn_data_t* body = pn_message_body(message); + pn_data_format(body, buffer, &buffsize); + + printf("Address: %s\n", pn_message_get_address(message)); + printf("Subject: %s\n", subject ? subject : "(no subject)"); + printf("Content: %s\n", buffer); + + pn_messenger_accept(messenger, tracker, 0); + } } } -// Callback used by emscripten to ensure pn_messenger_work gets called. -void work(void) { -//printf(" *** work ***\n"); - - int err = pn_messenger_work(messenger, 0); -printf("err = %d\n", err); - - if (err >= 0) { +#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler. +void pump(int fd, void* userData) { + while (pn_messenger_work(messenger, 0) >= 0) { process(); } +} - err = pn_messenger_work(messenger, 0); -printf("err = %d\n", err); +void onclose(int fd, void* userData) { + process(); +} - if (err >= 0) { - process(); - } +void onerror(int fd, int errno, const char* msg, void* userData) { + printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg); } +#endif int main(int argc, char** argv) { - char* certificate = NULL; - char* privatekey = NULL; - char* password = NULL; - char* address = (char *) "amqp://~0.0.0.0"; - int c; - opterr = 0; - - while((c = getopt(argc, argv, "hc:k:p:")) != -1) - { - switch(c) - { - case 'h': - usage(); - break; - - case 'c': certificate = optarg; break; - case 'k': privatekey = optarg; break; - case 'p': password = optarg; break; - - case '?': - if(optopt == 'c' || - optopt == 'k' || - optopt == 'p') - { - fprintf(stderr, "Option -%c requires an argument.\n", optopt); - } - else if(isprint(optopt)) - { - fprintf(stderr, "Unknown option `-%c'.\n", optopt); - } - else - { - fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); - } - return 1; - default: - abort(); - } - } - - if (optind < argc) - { - address = argv[optind]; - } - -// pn_message_t * message; -// pn_messenger_t * messenger; - - message = pn_message(); - messenger = pn_messenger(NULL); -pn_messenger_set_blocking(messenger, false); // FA Addition. - + char* certificate = NULL; + char* privatekey = NULL; + char* password = NULL; + char* address = (char *) "amqp://~0.0.0.0"; + int c; + message = pn_message(); + messenger = pn_messenger(NULL); + pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously. -//pn_messenger_set_incoming_window(messenger, 1024); // FA Addition. + opterr = 0; + while((c = getopt(argc, argv, "hc:k:p:")) != -1) + { + switch(c) + { + case 'h': + usage(); + break; + + case 'c': certificate = optarg; break; + case 'k': privatekey = optarg; break; + case 'p': password = optarg; break; + + case '?': + if (optopt == 'c' || + optopt == 'k' || + optopt == 'p') + { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } + else if(isprint(optopt)) + { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + else + { + fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); + } + return 1; + default: + abort(); + } + } + if (optind < argc) + { + address = argv[optind]; + } - /* load the various command line options if they're set */ - if(certificate) - { - pn_messenger_set_certificate(messenger, certificate); - } + /* load the various command line options if they're set */ + if(certificate) + { + pn_messenger_set_certificate(messenger, certificate); + } - if(privatekey) - { - pn_messenger_set_private_key(messenger, privatekey); - } + if(privatekey) + { + pn_messenger_set_private_key(messenger, privatekey); + } - if(password) - { - pn_messenger_set_password(messenger, password); - } + if(password) + { + pn_messenger_set_password(messenger, password); + } - pn_messenger_start(messenger); - check(messenger); + pn_messenger_start(messenger); + check(messenger); - pn_messenger_subscribe(messenger, address); - check(messenger); + pn_messenger_subscribe(messenger, address); + check(messenger); - pn_messenger_recv(messenger, -1); // Receive as many messages as messenger can buffer + pn_messenger_recv(messenger, -1); // Set to receive as many messages as messenger can buffer. -#if EMSCRIPTEN - //emscripten_set_main_loop(work, 0, 0); +#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler. + emscripten_set_socket_error_callback(NULL, onerror); - emscripten_set_network_callback(work); -#else - while (1) { - pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. - process(); - } + emscripten_set_socket_open_callback(NULL, pump); + emscripten_set_socket_connection_callback(NULL, pump); + emscripten_set_socket_message_callback(NULL, pump); + emscripten_set_socket_close_callback(NULL, onclose); +#else // For native compiler. + while (1) { + pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. + process(); + } #endif - return 0; + return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/c/send-async.c ---------------------------------------------------------------------- diff --git a/examples/messenger/c/send-async.c b/examples/messenger/c/send-async.c index f8437c4..2c76e6c 100644 --- a/examples/messenger/c/send-async.c +++ b/examples/messenger/c/send-async.c @@ -18,6 +18,8 @@ * */ +// This is a re-implementation of send.c using non-blocking/asynchronous calls. + #include "proton/message.h" #include "proton/messenger.h" #include "proton/driver.h" @@ -30,9 +32,13 @@ #if EMSCRIPTEN #include <emscripten.h> -void emscripten_set_network_callback(void (*func)()); #endif +pn_message_t * message; +pn_messenger_t * messenger; +pn_tracker_t tracker; +int running = 1; + #define check(messenger) \ { \ if(pn_messenger_errno(messenger)) \ @@ -41,152 +47,121 @@ void emscripten_set_network_callback(void (*func)()); } \ } \ -// FA Temporarily make global - pn_message_t * message; - pn_messenger_t * messenger; - -pn_tracker_t tracker; -int tracked = 1; - -int running = 1; - - void die(const char *file, int line, const char *message) { - fprintf(stderr, "%s:%i: %s\n", file, line, message); - exit(1); + fprintf(stderr, "%s:%i: %s\n", file, line, message); + exit(1); } void usage(void) { - printf("Usage: send [-a addr] [message]\n"); - printf("-a \tThe target address [amqp[s]://domain[/name]]\n"); - printf("message\tA text string to send.\n"); - exit(0); + printf("Usage: send [-a addr] [message]\n"); + printf("-a \tThe target address [amqp[s]://domain[/name]]\n"); + printf("message\tA text string to send.\n"); + exit(0); } void process(void) { -//printf(" *** process ***\n"); - - // Process outgoing messages - pn_status_t status = pn_messenger_status(messenger, tracker); -//printf("status = %d\n", status); - if (status != PN_STATUS_PENDING) { -printf("status = %d\n", status); - - //pn_messenger_settle(messenger, tracker, 0); - //tracked--; - if (running) { -printf("stopping\n"); pn_messenger_stop(messenger); running = 0; } } if (pn_messenger_stopped(messenger)) { -printf("exiting\n"); pn_message_free(message); pn_messenger_free(messenger); - exit(0); } } - - -// Callback used by emscripten to ensure pn_messenger_work gets called. -void work(void) { -//printf(" *** work ***\n"); - - int err = pn_messenger_work(messenger, 0); -printf("err = %d\n", err); - - if (err >= 0) { +#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler. +void pump(int fd, void* userData) { + while (pn_messenger_work(messenger, 0) >= 0) { process(); } +} - err = pn_messenger_work(messenger, 0); -printf("err = %d\n", err); +void onclose(int fd, void* userData) { + process(); +} - if (err >= 0) { - process(); - } +void onerror(int fd, int errno, const char* msg, void* userData) { + printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg); } +#endif int main(int argc, char** argv) { - int c; - opterr = 0; - char * address = (char *) "amqp://0.0.0.0"; - char * msgtext = (char *) "Hello World!"; - - while((c = getopt(argc, argv, "ha:b:c:")) != -1) - { - switch(c) + int c; + opterr = 0; + char * address = (char *) "amqp://0.0.0.0"; + char * msgtext = (char *) "Hello World!"; + + while((c = getopt(argc, argv, "ha:b:c:")) != -1) { - case 'a': address = optarg; break; - case 'h': usage(); break; - - case '?': - if(optopt == 'a') - { - fprintf(stderr, "Option -%c requires an argument.\n", optopt); - } - else if(isprint(optopt)) - { - fprintf(stderr, "Unknown option `-%c'.\n", optopt); - } - else - { - fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); - } - return 1; - default: - abort(); + switch(c) + { + case 'a': address = optarg; break; + case 'h': usage(); break; + + case '?': + if(optopt == 'a') + { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } + else if(isprint(optopt)) + { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + else + { + fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); + } + return 1; + default: + abort(); + } } - } - - if (optind < argc) msgtext = argv[optind]; - -// pn_message_t * message; -// pn_messenger_t * messenger; - - message = pn_message(); - messenger = pn_messenger(NULL); - pn_messenger_set_blocking(messenger, false); // Put messenger into non-blocking mode. + if (optind < argc) msgtext = argv[optind]; - pn_messenger_set_outgoing_window(messenger, 1024); // FA Addition. + message = pn_message(); + messenger = pn_messenger(NULL); + pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously. + pn_messenger_set_outgoing_window(messenger, 1024); + pn_messenger_start(messenger); + pn_message_set_address(message, address); + pn_data_t* body = pn_message_body(message); + pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext)); + pn_messenger_put(messenger, message); + check(messenger); - pn_messenger_start(messenger); + tracker = pn_messenger_outgoing_tracker(messenger); - pn_message_set_address(message, address); - pn_data_t *body = pn_message_body(message); - pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext)); +#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler. + emscripten_set_socket_error_callback(NULL, onerror); - pn_messenger_put(messenger, message); - check(messenger); - - tracker = pn_messenger_outgoing_tracker(messenger); -//printf("tracker = %lld\n", (long long int)tracker); - - -#if EMSCRIPTEN - //emscripten_set_main_loop(work, 0, 0); + emscripten_set_socket_open_callback(NULL, pump); + emscripten_set_socket_connection_callback(NULL, pump); + emscripten_set_socket_message_callback(NULL, pump); + emscripten_set_socket_close_callback(NULL, onclose); +#else // For native compiler. + while (running) { + pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. + process(); + } - emscripten_set_network_callback(work); -#else - while (1) { - pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. - process(); - } + while (!pn_messenger_stopped(messenger)) { + pn_messenger_work(messenger, 0); + process(); + } #endif - return 0; + return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/client.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/client.js b/examples/messenger/javascript/client.js new file mode 100644 index 0000000..c9419a2 --- /dev/null +++ b/examples/messenger/javascript/client.js @@ -0,0 +1,102 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Simple client for use with server.js illustrating request/response + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var address = "amqp://0.0.0.0"; +var subject = "UK.WEATHER"; +var replyTo = "~/replies"; +var msgtext = "Hello World!"; +var tracker = null; +var running = true; + +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +var pumpData = function() { + while (messenger.incoming()) { + var t = messenger.get(message); + + console.log("Reply"); + console.log("Address: " + message.getAddress()); + console.log("Subject: " + message.getSubject()); + + // body is the body as a native JavaScript Object, useful for most real cases. + //console.log("Content: " + message.body); + + // data is the body as a proton.Data Object, used in this case because + // format() returns exactly the same representation as recv.c + console.log("Content: " + message.data.format()); + + messenger.accept(t); + messenger.stop(); + } + + if (messenger.isStopped()) { + message.free(); + messenger.free(); + } +}; + +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: node client.js [-r replyTo] [-s subject] <addr> (default " + address + ")"); + console.log("Options:"); + console.log(" -r <reply to> The message replyTo (default " + replyTo + ")"); + console.log(" -s <subject> The message subject (default " + subject + ")"); + process.exit(0); + } + + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg.charAt(0) === '-') { + i++; + var val = args[i]; + if (arg === '-r') { + replyTo = val; + } else if (arg === '-s') { + subject = val; + } + } else { + address = arg; + } + } +} + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.setOutgoingWindow(1024); +messenger.start(); + +message.setAddress(address); +message.setSubject(subject); +message.setReplyTo(replyTo); +message.body = msgtext; + +tracker = messenger.put(message); +messenger.recv(); // Receive as many messages as messenger can buffer. + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/drain.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/drain.js b/examples/messenger/javascript/drain.js index eacd8a2..923e29a 100644 --- a/examples/messenger/javascript/drain.js +++ b/examples/messenger/javascript/drain.js @@ -1,3 +1,4 @@ +#!/usr/bin/env node /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,40 +24,45 @@ if (typeof exports !== "undefined" && exports !== null) { proton = require("qpid-proton"); } -try { - var address = "amqp://~0.0.0.0"; - var message = new proton.Message(); - var messenger = new proton.Messenger(); +console.log("drain not implemented yet"); +process.exit(0); - function _process() { -// console.log(" *** process ***"); +var address = "amqp://~0.0.0.0"; +var message = new proton.Message(); +var messenger = new proton.Messenger(); - // Process incoming messages +var pumpData = function() { + while (messenger.incoming()) { + var t = messenger.get(message); - while (messenger.incoming()) { -console.log("in while loop\n"); + console.log("Address: " + message.getAddress()); + console.log("Subject: " + message.getSubject()); - var tracker = messenger.get(message); -console.log("tracker = " + tracker); + // body is the body as a native JavaScript Object, useful for most real cases. + //console.log("Content: " + message.body); - console.log("Address: " + message.getAddress()); - console.log("Subject: " + message.getSubject()); - console.log("Content: " + message.body); + // data is the body as a proton.Data Object, used in this case because + // format() returns exactly the same representation as recv.c + console.log("Content: " + message.data.format()); - messenger.accept(tracker); - } - }; + messenger.accept(t); + } +}; - //messenger.setIncomingWindow(1024); +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: recv <addr> (default " + address + ")."); + process.exit(0); + } - messenger.setNetworkCallback(_process); - messenger.start(); - - messenger.subscribe(address); - messenger.recv(); // Receive as many messages as messenger can buffer. - -} catch(e) { - console.log("Caught Exception " + e); + address = args[0]; } +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.start(); + +messenger.subscribe(address); +messenger.recv(); // Receive as many messages as messenger can buffer. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/proxy.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/proxy.js b/examples/messenger/javascript/proxy.js new file mode 100755 index 0000000..bd1c208 --- /dev/null +++ b/examples/messenger/javascript/proxy.js @@ -0,0 +1,100 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * proxy.js is a simple node.js command line application that uses the ws2tcp.js + * library to proxy from a WebSocket to a TCP Socket or vice versa. + * <p> + * Usage: node proxy.js [options] + * Options:"); + * -p <listen port>, --port <listen port> (default 5673 for ws2tcp + * 5672 for tcp2ws) + * -t <target port>, --tport <target port> (default listen port - 1 for ws2tcp + * listen port + 1 for tcp2ws) + * -h <target host>, --thost <target host> (default 0.0.0.0) + * -m <ws2tcp or tcp2ws>, --method <ws2tcp or tcp2ws> (default ws2tcp) + * @Author Fraser Adams + * @file + */ + +var proxy = require('./ws2tcp.js'); + +var lport = 5673; +var tport = lport - 1; +var thost = '0.0.0.0'; +var method = 'ws2tcp'; + +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: node proxy.js [options]"); + console.log("Options:"); + console.log(" -p <listen port>, --port <listen port> (default " + lport + " for ws2tcp"); + console.log(" " + tport + " for tcp2ws)"); + console.log(" -t <target port>, --tport <target port> (default listen port - 1 for ws2tcp"); + console.log(" listen port + 1 for tcp2ws)"); + console.log(" -h <target host>, --thost <target host> (default " + thost + ")"); + console.log(" -m <ws2tcp or tcp2ws>, --method <ws2tcp or tcp2ws> (default " + method + ")"); + process.exit(0); + } + + var lportSet = false; + var tportSet = false; + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg.charAt(0) === '-') { + i++; + var val = args[i]; + if (arg === '-p' || arg === '--port') { + lport = val; + lportSet = true; + } else if (arg === '-t' || arg === '--tport') { + tport = val; + tportSet = true; + } else if (arg === '-h' || arg === '--thost') { + thost = val; + } else if (arg === '-m' || arg === '--method') { + method = val; + } + } + } + + if (method === 'tcp2ws' && !lportSet) { + lport--; + } + + if (!tportSet) { + tport = (method === 'ws2tcp') ? lport - 1 : +lport + 1; + } +} + +if (method === 'tcp2ws') { + console.log("Proxying tcp -> ws"); + console.log("Forwarding port " + lport + " to " + thost + ":" + tport); + proxy.tcp2ws(lport, thost, tport, 'AMQPWSB10'); +} else if (method === 'ws2tcp') { + console.log("Proxying ws -> tcp"); + console.log("Forwarding port " + lport + " to " + thost + ":" + tport); + proxy.ws2tcp(lport, thost, tport); +} else { + console.error("Method must be either ws2tcp or tcp2ws."); +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/qpid-config.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/qpid-config.js b/examples/messenger/javascript/qpid-config.js new file mode 100755 index 0000000..466f8b6 --- /dev/null +++ b/examples/messenger/javascript/qpid-config.js @@ -0,0 +1,1375 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Port of qpid-config to JavaScript for node.js, mainly intended as a demo to + * illustrate using QMF2 in JavaScript using the proton.Messenger JS binding. + * It illustrates a few things including how to use Messenger completely + * asynchronously including using an async request/response pattern with + * correlation IDs. It also proves interoperability of AMQP Map, List etc. + * between C++ and JavaScript as QMF2 is pretty much all about Lists of Maps. + * <p> + * The actual QMF2 code is pretty simple as we're just doing a basic getObjects + * it's made all the simpler because we can use JavaScript object literals as + * the JavaScript binding serialises and deserialises directly between JavaScript + * Objects and Lists and the AMQP type system so something that can be quite + * involved in languages like C++ and Java becomes quite simple in JavaScript, + * though the asynchronous nature of JavaScript provides its own opportunities + * for complication best illustrated by the need for the correlator object. + */ + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var address = 'amqp://0.0.0.0:5673/qmf.default.direct'; +var replyTo = ''; +var subscription; +var subscribed = false; + +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +/** + * The correlator object is a mechanism used to correlate requests with their + * aynchronous responses. It might possible be better to make use of Promises + * to implement part of this behaviour but a mechanism would still be meeded to + * correlate a request with its response callback in order to wrap things up in + * a Promise, so much of the behaviour of this object would still be required. + * In addition it seemed to make sense to make this QMF2 implementation fairly + * free of dependencies and using Promises would require external libraries. + * Instead the correlator implements Promise-like semantics, you might call it + * a broken Promise :-) + * in particular the request method behaves a *bit* like Promise.all() though it + * is mostly fake and takes an array of functions that call the add() method + * which is really the method used to associate response objects by correlationID. + * The then method is used to register a listener that will be called when all + * the requests that have been registered have received responses. + * TODO error/timeout handling. + */ +var correlator = { + _resolve: null, + _objects: {}, + add: function(id) { + this._objects[id] = {complete: false, list: null}; + }, + request: function() { + this._resolve = function() {console.log("Warning: No resolver has been set")}; + return this; + }, + then: function(resolver) { + this._resolve = resolver ? resolver : this._resolve; + }, + resolve: function() { + var opcode = message.properties['qmf.opcode']; + var correlationID = message.getCorrelationID(); + var resp = this._objects[correlationID]; + if (opcode === '_query_response') { + if (resp.list) { + Array.prototype.push.apply(resp.list, message.body); // This is faster than concat. + } else { + resp.list = message.body; + } + + var partial = message.properties['partial']; + if (!partial) { + resp.complete = true; + } + + this._objects[correlationID] = resp; + this._checkComplete(); + } else if (opcode === '_method_response' || opcode === '_exception') { + resp.list = message.body; + resp.complete = true; + this._objects[correlationID] = resp; + this._checkComplete(); + } else { + console.error("Bad Message response, qmf.opcode = " + opcode); + } + }, + _checkComplete: function() { + var response = {}; + for (var id in this._objects) { + var object = this._objects[id]; + if (object.complete) { + response[id] = object.list; + } else { + return; + } + } + + this._objects = {}; // Clear state ready for next call. + this._resolve(response.method ? response.method : response); + } +}; + +var pumpData = function() { + if (!subscribed) { + var subscriptionAddress = subscription.getAddress(); + if (subscriptionAddress) { + subscribed = true; + var splitAddress = subscriptionAddress.split('/'); + replyTo = splitAddress[splitAddress.length - 1]; + + onSubscription(); + } + } + + while (messenger.incoming()) { + // The second parameter forces Binary payloads to be decoded as strings + // this is useful because the broker QMF Agent encodes strings as AMQP + // binary, which is a right pain from an interoperability perspective. + var t = messenger.get(message, true); + correlator.resolve(); + messenger.accept(t); + } + + if (messenger.isStopped()) { + message.free(); + messenger.free(); + } +}; + +var getObjects = function(packageName, className) { + message.setAddress(address); + message.setSubject('broker'); + message.setReplyTo(replyTo); + message.setCorrelationID(className); + message.properties = { + "x-amqp-0-10.app-id": "qmf2", + "method": "request", + "qmf.opcode": "_query_request", + }; + message.body = { + "_what": "OBJECT", + "_schema_id": { + "_package_name": packageName, + "_class_name": className + } + }; + + correlator.add(className); + messenger.put(message); +}; + +var invokeMethod = function(object, method, arguments) { + var correlationID = 'method'; + message.setAddress(address); + message.setSubject('broker'); + message.setReplyTo(replyTo); + message.setCorrelationID(correlationID); + message.properties = { + "x-amqp-0-10.app-id": "qmf2", + "method": "request", + "qmf.opcode": "_method_request", + }; + message.body = { + "_object_id": object._object_id, + "_method_name" : method, + "_arguments" : arguments + }; + + correlator.add(correlationID); + messenger.put(message); +}; + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.setOutgoingWindow(1024); +messenger.start(); + +subscription = messenger.subscribe('amqp://0.0.0.0:5673/#'); +messenger.recv(); // Receive as many messages as messenger can buffer. + + +/************************* qpid-config business logic ************************/ + +var _usage = +'Usage: qpid-config [OPTIONS]\n' + +' qpid-config [OPTIONS] exchanges [filter-string]\n' + +' qpid-config [OPTIONS] queues [filter-string]\n' + +' qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n' + +' qpid-config [OPTIONS] del exchange <name>\n' + +' qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n' + +' qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n' + +' qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n' + +' <for type xml> [-f -|filename]\n' + +' <for type header> [all|any] k1=v1 [, k2=v2...]\n' + +' qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n' + +' qpid-config [OPTIONS] reload-acl\n' + +' qpid-config [OPTIONS] add <type> <name> [--argument <property-name>=<property-value>]\n' + +' qpid-config [OPTIONS] del <type> <name>\n' + +' qpid-config [OPTIONS] list <type> [--show-property <property-name>]\n'; + +var usage = function() { + console.log(_usage); + process.exit(-1); +}; + +var _description = +'Examples:\n' + +'\n' + +'$ qpid-config add queue q\n' + +'$ qpid-config add exchange direct d -a localhost:5672\n' + +'$ qpid-config exchanges -b 10.1.1.7:10000\n' + +'$ qpid-config queues -b guest/guest@broker-host:10000\n' + +'\n' + +'Add Exchange <type> values:\n' + +'\n' + +' direct Direct exchange for point-to-point communication\n' + +' fanout Fanout exchange for broadcast communication\n' + +' topic Topic exchange that routes messages using binding keys with wildcards\n' + +' headers Headers exchange that matches header fields against the binding keys\n' + +' xml XML Exchange - allows content filtering using an XQuery\n' + +'\n' + +'\n' + +'Queue Limit Actions:\n' + +'\n' + +' none (default) - Use broker\'s default policy\n' + +' reject - Reject enqueued messages\n' + +' ring - Replace oldest unacquired message with new\n' + +'\n' + +'Replication levels:\n' + +'\n' + +' none - no replication\n' + +' configuration - replicate queue and exchange existence and bindings, but not messages.\n' + +' all - replicate configuration and messages\n'; + +var _options = +'Options:\n' + +' -h, --help show this help message and exit\n' + +'\n' + +' General Options:\n' + +' -t <secs>, --timeout=<secs>\n' + +' Maximum time to wait for broker connection (in\n' + +' seconds)\n' + +' -r, --recursive Show bindings in queue or exchange list\n' + +' -b <address>, --broker=<address>\n' + +' Address of qpidd broker with syntax:\n' + +' [username/password@] hostname | ip-address [:<port>]\n' + +' -a <address>, --broker-addr=<address>\n' + +/* TODO Connection options +' --sasl-mechanism=<mech>\n' + +' SASL mechanism for authentication (e.g. EXTERNAL,\n' + +' ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n' + +' automatically picks the most secure available\n' + +' mechanism - use this option to override.\n' + +' --ssl-certificate=<cert>\n' + +' Client SSL certificate (PEM Format)\n' + +' --ssl-key=<key> Client SSL private key (PEM Format)\n' + +' --ha-admin Allow connection to a HA backup broker.\n' + +*/ +'\n' + +' Options for Listing Exchanges and Queues:\n' + +' --ignore-default Ignore the default exchange in exchange or queue list\n' + +'\n' + +' Options for Adding Exchanges and Queues:\n' + +' --alternate-exchange=<aexname>\n' + +' Name of the alternate-exchange for the new queue or\n' + +' exchange. Exchanges route messages to the alternate\n' + +' exchange if they are unable to route them elsewhere.\n' + +' Queues route messages to the alternate exchange if\n' + +' they are rejected by a subscriber or orphaned by queue\n' + +' deletion.\n' + +' --durable The new queue or exchange is durable.\n' + +' --replicate=<level>\n' + +' Enable automatic replication in a HA cluster. <level>\n' + +' is \'none\', \'configuration\' or \'all\').\n' + +'\n' + +' Options for Adding Queues:\n' + +' --file-count=<n> Number of files in queue\'s persistence journal\n' + +' --file-size=<n> File size in pages (64KiB/page)\n' + +' --max-queue-size=<n>\n' + +' Maximum in-memory queue size as bytes\n' + +' --max-queue-count=<n>\n' + +' Maximum in-memory queue size as a number of messages\n' + +' --limit-policy=<policy>\n' + +' Action to take when queue limit is reached\n' + +' --lvq-key=<key> Last Value Queue key\n' + +' --generate-queue-events=<n>\n' + +' If set to 1, every enqueue will generate an event that\n' + +' can be processed by registered listeners (e.g. for\n' + +' replication). If set to 2, events will be generated\n' + +' for enqueues and dequeues.\n' + +' --flow-stop-size=<n>\n' + +' Turn on sender flow control when the number of queued\n' + +' bytes exceeds this value.\n' + +' --flow-resume-size=<n>\n' + +' Turn off sender flow control when the number of queued\n' + +' bytes drops below this value.\n' + +' --flow-stop-count=<n>\n' + +' Turn on sender flow control when the number of queued\n' + +' messages exceeds this value.\n' + +' --flow-resume-count=<n>\n' + +' Turn off sender flow control when the number of queued\n' + +' messages drops below this value.\n' + +' --group-header=<header-name>\n' + +' Enable message groups. Specify name of header that\n' + +' holds group identifier.\n' + +' --shared-groups Allow message group consumption across multiple\n' + +' consumers.\n' + +' --argument=<NAME=VALUE>\n' + +' Specify a key-value pair to add to queue arguments\n' + +' --start-replica=<broker-url>\n' + +' Start replication from the same-named queue at\n' + +' <broker-url>\n' + +'\n' + +' Options for Adding Exchanges:\n' + +' --sequence Exchange will insert a \'qpid.msg_sequence\' field in\n' + +' the message header\n' + +' --ive Exchange will behave as an \'initial-value-exchange\',\n' + +' keeping a reference to the last message forwarded and\n' + +' enqueuing that message to newly bound queues.\n' + +'\n' + +' Options for Deleting Queues:\n' + +' --force Force delete of queue even if it\'s currently used or\n' + +' it\'s not empty\n' + +' --force-if-not-empty\n' + +' Force delete of queue even if it\'s not empty\n' + +' --force-if-used Force delete of queue even if it\'s currently used\n' + +'\n' + +' Options for Declaring Bindings:\n' + +' -f <file.xq>, --file=<file.xq>\n' + +' For XML Exchange bindings - specifies the name of a\n' + +' file containing an XQuery.\n' + +'\n' + +' Formatting options for \'list\' action:\n' + +' --show-property=<property-name>\n' + +' Specify a property of an object to be included in\n' + +' output\n'; + +var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true}; +var DEFAULT_PROPERTIES = {"exchange":["name", "type", "durable"], "queue":["name", "durable", "autoDelete"]}; + +var config = { + _recursive : false, + _host : 'localhost', + _connTimeout : 10, + _ignoreDefault : false, + _altern_ex : null, + _durable : false, + _replicate : null, + _if_empty : true, + _if_unused : true, + _fileCount : null, + _fileSize : null, + _maxQueueSize : null, + _maxQueueCount : null, + _limitPolicy : null, + _msgSequence : false, + _lvq_key : null, + _ive : null, + _eventGeneration: null, + _file : null, + _flowStopCount : null, + _flowResumeCount: null, + _flowStopSize : null, + _flowResumeSize : null, + _msgGroupHeader : null, + _sharedMsgGroup : false, + _extra_arguments: [], + _start_replica : null, + _returnCode : 0, + _list_properties: [], + + getOptions: function() { + var options = {}; + for (var a = 0; a < this._extra_arguments.length; a++) { + var r = this._extra_arguments[a].split('='); + var value = null; + if (r.length === 2) { + value = r[1]; + } + options[r[0]] = value; + } + return options; + } +}; + +var FILECOUNT = 'qpid.file_count'; +var FILESIZE = 'qpid.file_size'; +var MAX_QUEUE_SIZE = 'qpid.max_size'; +var MAX_QUEUE_COUNT = 'qpid.max_count'; +var POLICY_TYPE = 'qpid.policy_type'; +var LVQ_KEY = 'qpid.last_value_queue_key'; +var MSG_SEQUENCE = 'qpid.msg_sequence'; +var IVE = 'qpid.ive'; +var QUEUE_EVENT_GENERATION = 'qpid.queue_event_generation'; +var FLOW_STOP_COUNT = 'qpid.flow_stop_count'; +var FLOW_RESUME_COUNT = 'qpid.flow_resume_count'; +var FLOW_STOP_SIZE = 'qpid.flow_stop_size'; +var FLOW_RESUME_SIZE = 'qpid.flow_resume_size'; +var MSG_GROUP_HDR_KEY = 'qpid.group_header_key'; +var SHARED_MSG_GROUP = 'qpid.shared_msg_group'; +var REPLICATE = 'qpid.replicate'; + +/** + * There are various arguments to declare that have specific program + * options in this utility. However there is now a generic mechanism for + * passing arguments as well. The SPECIAL_ARGS list contains the + * arguments for which there are specific program options defined + * i.e. the arguments for which there is special processing on add and + * list +*/ +var SPECIAL_ARGS={}; +SPECIAL_ARGS[FILECOUNT] = true; +SPECIAL_ARGS[FILESIZE] = true; +SPECIAL_ARGS[MAX_QUEUE_SIZE] = true; +SPECIAL_ARGS[MAX_QUEUE_COUNT] = true; +SPECIAL_ARGS[POLICY_TYPE] = true; +SPECIAL_ARGS[LVQ_KEY] = true; +SPECIAL_ARGS[MSG_SEQUENCE] = true; +SPECIAL_ARGS[IVE] = true; +SPECIAL_ARGS[QUEUE_EVENT_GENERATION] = true; +SPECIAL_ARGS[FLOW_STOP_COUNT] = true; +SPECIAL_ARGS[FLOW_RESUME_COUNT] = true; +SPECIAL_ARGS[FLOW_STOP_SIZE] = true; +SPECIAL_ARGS[FLOW_RESUME_SIZE] = true; +SPECIAL_ARGS[MSG_GROUP_HDR_KEY] = true; +SPECIAL_ARGS[SHARED_MSG_GROUP] = true; +SPECIAL_ARGS[REPLICATE] = true; + +var oid = function(id) { + return id._agent_epoch + ':' + id._object_name +}; + +var filterMatch = function(name, filter) { + if (filter === '') { + return true; + } + if (name.indexOf(filter) === -1) { + return false; + } + return true; +}; + +var idMap = function(list) { + var map = {}; + for (var i = 0; i < list.length; i++) { + var item = list[i]; + map[oid(item._object_id)] = item; + } + return map; +}; + +var renderArguments = function(obj, list) { + if (!obj) { + return ''; + } + var string = ''; + var addComma = false; + for (var prop in obj) { + if (addComma) { + string += ', '; + } + if (obj.hasOwnProperty(prop)) { + if (list) { + if (SPECIAL_ARGS[prop]) continue; + string += " --argument " + prop + "=" + obj[prop]; + } else { + string += "'" + prop + "'" + ": '" + obj[prop] + "'"; + addComma = true; + } + } + } + + if (addComma) { + return ' {' + string + '}'; + } else { + if (list) { + return string; + } else { + return ''; + } + } +}; + +/** + * The following methods illustrate the QMF2 class query mechanism which returns + * the list of QMF Objects for the specified class that are currently present + * on the Broker. The Schema <qpid>/cpp/src/qpid/broker/management-schema.xml + * describes the properties and statistics of each Management Object. + * <p> + * One slightly subtle part of QMF is that certain Objects are associated via + * references, for example Binding contains queueRef and exchangeRef, which lets + * Objects link to each other using their _object_id property. + * <p> + * The implementation of these methods attempts to follow the same general flow + * as the equivalent method in the "canonical" python based qpid-config version + * but has the added complication that JavaScript is entirely asynchronous. + * The approach that has been taken is to use the correlator object that lets a + * callback function be registered via the "then" method and actually calls the + * callback when all of the requests specified in the request method have + * returned their results (which get passed as the callback parameter). + */ + +var overview = function() { + correlator.request( + // Send the QMF query requests for the specified classes. + getObjects('org.apache.qpid.broker', 'queue'), + getObjects('org.apache.qpid.broker', 'exchange') + ).then(function(objects) { + var exchanges = objects.exchange; + var queues = objects.queue; + console.log("Total Exchanges: " + exchanges.length); + var etype = {}; + for (var i = 0; i < exchanges.length; i++) { + var exchange = exchanges[i]._values; + if (!etype[exchange.type]) { + etype[exchange.type] = 1; + } else { + etype[exchange.type]++; + } + } + for (var typ in etype) { + var pad = Array(16 - typ.length).join(' '); + console.log(pad + typ + ": " + etype[typ]); + } + + console.log("\n Total Queues: " + queues.length); + var durable = 0; + for (var i = 0; i < queues.length; i++) { + var queue = queues[i]._values; + if (queue.durable) { + durable++; + } + } + console.log(" durable: " + durable); + console.log(" non-durable: " + (queues.length - durable)); + messenger.stop(); + }); +}; + +var exchangeList = function(filter) { + correlator.request( + // Send the QMF query requests for the specified classes. + getObjects('org.apache.qpid.broker', 'exchange') + ).then(function(objects) { + var exchanges = objects.exchange; + var exMap = idMap(exchanges); + var caption1 = "Type "; + var caption2 = "Exchange Name"; + var maxNameLen = caption2.length; + var found = false; + for (var i = 0; i < exchanges.length; i++) { + var exchange = exchanges[i]._values; + if (filterMatch(exchange.name, filter)) { + if (exchange.name.length > maxNameLen) { + maxNameLen = exchange.name.length; + } + found = true; + } + } + if (!found) { + config._returnCode = 1; + return; + } + + var pad = Array(maxNameLen + 1 - caption2.length).join(' '); + console.log(caption1 + caption2 + pad + " Attributes"); + console.log(Array(maxNameLen + caption1.length + 13).join('=')); + + for (var i = 0; i < exchanges.length; i++) { + var exchange = exchanges[i]._values; + if (config._ignoreDefault && !exchange.name) continue; + if (filterMatch(exchange.name, filter)) { + var pad1 = Array(11 - exchange.type.length).join(' '); + var pad2 = Array(maxNameLen + 2 - exchange.name.length).join(' '); + var string = exchange.type + pad1 + exchange.name + pad2; + var args = exchange.arguments ? exchange.arguments : {}; + if (exchange.durable) { + string += ' --durable'; + } + if (args[REPLICATE]) { + string += ' --replicate=' + args[REPLICATE]; + } + if (args[MSG_SEQUENCE]) { + string += ' --sequence'; + } + if (args[IVE]) { + string += ' --ive'; + } + if (exchange.altExchange) { + string += ' --alternate-exchange=' + exMap[oid(exchange.altExchange)]._values.name; + } + console.log(string); + } + } + messenger.stop(); + }); +}; + +var exchangeListRecurse = function(filter) { + correlator.request( + // Send the QMF query requests for the specified classes. + getObjects('org.apache.qpid.broker', 'queue'), + getObjects('org.apache.qpid.broker', 'exchange'), + getObjects('org.apache.qpid.broker', 'binding') + ).then(function(objects) { + var exchanges = objects.exchange; + var bindings = objects.binding; + var queues = idMap(objects.queue); + + for (var i = 0; i < exchanges.length; i++) { + var exchange = exchanges[i]; + var exchangeId = oid(exchange._object_id); + exchange = exchange._values; + + if (config._ignoreDefault && !exchange.name) continue; + if (filterMatch(exchange.name, filter)) { + console.log("Exchange '" + exchange.name + "' (" + exchange.type + ")"); + for (var j = 0; j < bindings.length; j++) { + var bind = bindings[j]._values; + var exchangeRef = oid(bind.exchangeRef); + + if (exchangeRef === exchangeId) { + var queue = queues[oid(bind.queueRef)]; + var queueName = queue ? queue._values.name : "<unknown>"; + console.log(" bind [" + bind.bindingKey + "] => " + queueName + + renderArguments(bind.arguments)); + } + } + } + } + messenger.stop(); + }); +}; + +var queueList = function(filter) { + correlator.request( + // Send the QMF query requests for the specified classes. + getObjects('org.apache.qpid.broker', 'queue'), + getObjects('org.apache.qpid.broker', 'exchange') + ).then(function(objects) { + var queues = objects.queue; + var exMap = idMap(objects.exchange); + var caption = "Queue Name"; + var maxNameLen = caption.length; + var found = false; + for (var i = 0; i < queues.length; i++) { + var queue = queues[i]._values; + if (filterMatch(queue.name, filter)) { + if (queue.name.length > maxNameLen) { + maxNameLen = queue.name.length; + } + found = true; + } + } + if (!found) { + config._returnCode = 1; + return; + } + + var pad = Array(maxNameLen + 1 - caption.length).join(' '); + console.log(caption + pad + " Attributes"); + console.log(Array(maxNameLen + caption.length + 3).join('=')); + + for (var i = 0; i < queues.length; i++) { + var queue = queues[i]._values; + if (filterMatch(queue.name, filter)) { + var pad2 = Array(maxNameLen + 2 - queue.name.length).join(' '); + var string = queue.name + pad2; + var args = queue.arguments ? queue.arguments : {}; + if (queue.durable) { + string += ' --durable'; + } + if (args[REPLICATE]) { + string += ' --replicate=' + args[REPLICATE]; + } + if (queue.autoDelete) { + string += ' auto-del'; + } + if (queue.exclusive) { + string += ' excl'; + } + if (args[FILESIZE]) { + string += ' --file-size=' + args[FILESIZE]; + } + if (args[FILECOUNT]) { + string += ' --file-count=' + args[FILECOUNT]; + } + if (args[MAX_QUEUE_SIZE]) { + string += ' --max-queue-size=' + args[MAX_QUEUE_SIZE]; + } + if (args[MAX_QUEUE_COUNT]) { + string += ' --max-queue-count=' + args[MAX_QUEUE_COUNT]; + } + if (args[POLICY_TYPE]) { + string += ' --limit-policy=' + args[POLICY_TYPE].replace("_", "-"); + } + if (args[LVQ_KEY]) { + string += ' --lvq-key=' + args[LVQ_KEY]; + } + if (args[QUEUE_EVENT_GENERATION]) { + string += ' --generate-queue-events=' + args[QUEUE_EVENT_GENERATION]; + } + if (queue.altExchange) { + string += ' --alternate-exchange=' + exMap[oid(queue.altExchange)]._values.name; + } + if (args[FLOW_STOP_SIZE]) { + string += ' --flow-stop-size=' + args[FLOW_STOP_SIZE]; + } + if (args[FLOW_RESUME_SIZE]) { + string += ' --flow-resume-size=' + args[FLOW_RESUME_SIZE]; + } + if (args[FLOW_STOP_COUNT]) { + string += ' --flow-stop-count=' + args[FLOW_STOP_COUNT]; + } + if (args[FLOW_RESUME_COUNT]) { + string += ' --flow-resume-count=' + args[FLOW_RESUME_COUNT]; + } + if (args[MSG_GROUP_HDR_KEY]) { + string += ' --group-header=' + args[MSG_GROUP_HDR_KEY]; + } + if (args[SHARED_MSG_GROUP] === 1) { + string += ' --shared-groups'; + } + string += renderArguments(args, true); + console.log(string); + } + } + messenger.stop(); + }); +}; + +var queueListRecurse = function(filter) { + correlator.request( + // Send the QMF query requests for the specified classes. + getObjects('org.apache.qpid.broker', 'queue'), + getObjects('org.apache.qpid.broker', 'exchange'), + getObjects('org.apache.qpid.broker', 'binding') + ).then(function(objects) { + var queues = objects.queue; + var bindings = objects.binding; + var exchanges = idMap(objects.exchange); + + for (var i = 0; i < queues.length; i++) { + var queue = queues[i]; + var queueId = oid(queue._object_id); + queue = queue._values; + + if (filterMatch(queue.name, filter)) { + console.log("Queue '" + queue.name + "'"); + for (var j = 0; j < bindings.length; j++) { + var bind = bindings[j]._values; + var queueRef = oid(bind.queueRef); + + if (queueRef === queueId) { + var exchange = exchanges[oid(bind.exchangeRef)]; + var exchangeName = "<unknown>"; + if (exchange) { + exchangeName = exchange._values.name; + if (exchangeName === '') { + if (config._ignoreDefault) continue; + exchangeName = "''"; + } + } + + console.log(" bind [" + bind.bindingKey + "] => " + exchangeName + + renderArguments(bind.arguments)); + } + } + } + } + messenger.stop(); + }); +}; + +/** + * The following methods implement adding and deleting various Broker Management + * Objects via QMF. Although <qpid>/cpp/src/qpid/broker/management-schema.xml + * describes the basic method schema, for example: + * <method name="create" desc="Create an object of the specified type"> + * <arg name="type" dir="I" type="sstr" desc="The type of object to create"/> + * <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> + * <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> + * <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> + * </method> + * + * <method name="delete" desc="Delete an object of the specified type"> + * <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/> + * <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> + * <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> + * </method> + * + * What the schema doesn't do however is to explain what the properties/options + * Map values actually mean, unfortunately these aren't documented anywhere so + * the only option is to look in the code, the best place to look is in: + * <qpid>/cpp/src/qpid/broker/Broker.cpp, the method Broker::ManagementMethod is + * the best place to start, then Broker::createObject and Broker::deleteObject + * even then it's pretty hard to figure out all that is possible. + */ + +var handleMethodResponse = function(response, dontStop) { +console.log("Method result"); + if (response._arguments) { + //console.log(response._arguments); + } if (response._values) { + console.error("Exception from Agent: " + renderArguments(response._values)); + } + // Mostly we want to stop the Messenger Event loop and exit when a QMF method + // call returns, but sometimes we don't. + if (!dontStop) { + messenger.stop(); + } +} + +var addExchange = function(args) { + if (args.length < 2) { + usage(); + } + + var etype = args[0]; + var ename = args[1]; + var declArgs = {}; + + declArgs['exchange-type'] = etype; + + for (var a = 0; a < config._extra_arguments.length; a++) { + var r = config._extra_arguments[a].split('='); + var value = null; + if (r.length === 2) { + value = r[1]; + } + declArgs[r[0]] = value; + } + + if (config._msgSequence) { + declArgs[MSG_SEQUENCE] = 1; + } + + if (config._ive) { + declArgs[IVE] = 1; + } + + if (config._altern_ex) { + declArgs['alternate-exchange'] = config._altern_ex; + } + + if (config._durable) { + declArgs['durable'] = 1; + } + + if (config._replicate) { + declArgs[REPLICATE] = config._replicate; + } + + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'create', { + "type": "exchange", + "name": ename, + "properties": declArgs, + "strict": true}) + ).then(handleMethodResponse); + }); +}; + +var delExchange = function(args) { + if (args.length < 1) { + usage(); + } + + var ename = args[0]; + + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'delete', { + "type": "exchange", + "name": ename}) + ).then(handleMethodResponse); + }); +}; + +var addQueue = function(args) { + if (args.length < 1) { + usage(); + } + + var qname = args[0]; + var declArgs = {}; + + for (var a = 0; a < config._extra_arguments.length; a++) { + var r = config._extra_arguments[a].split('='); + var value = null; + if (r.length === 2) { + value = r[1]; + } + declArgs[r[0]] = value; + } + + if (config._durable) { + // allow the default fileCount and fileSize specified + // in qpid config file to take prededence + if (config._fileCount) { + declArgs[FILECOUNT] = config._fileCount; + } + if (config._fileSize) { + declArgs[FILESIZE] = config._fileSize; + } + } + + if (config._maxQueueSize != null) { + declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize; + } + + if (config._maxQueueCount != null) { + declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount; + } + + if (config._limitPolicy) { + if (config._limitPolicy === 'none') { + } else if (config._limitPolicy === 'reject') { + declArgs[POLICY_TYPE] = 'reject'; + } else if (config._limitPolicy === 'ring') { + declArgs[POLICY_TYPE] = 'ring'; + } + } + + if (config._lvq_key) { + declArgs[LVQ_KEY] = config._lvq_key; + } + + if (config._eventGeneration) { + declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration; + } + + if (config._flowStopSize != null) { + declArgs[FLOW_STOP_SIZE] = config._flowStopSize; + } + + if (config._flowResumeSize != null) { + declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize; + } + + if (config._flowStopCount != null) { + declArgs[FLOW_STOP_COUNT] = config._flowStopCount; + } + + if (config._flowResumeCount != null) { + declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount; + } + + if (config._msgGroupHeader) { + declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader; + } + + if (config._sharedMsgGroup) { + declArgs[SHARED_MSG_GROUP] = 1; + } + + if (config._altern_ex) { + declArgs['alternate-exchange'] = config._altern_ex; + } + + if (config._durable) { + declArgs['durable'] = 1; + } + + if (config._replicate) { + declArgs[REPLICATE] = config._replicate; + } + + // This block is a little complex and untidy, the real issue is that the + // correlator object isn't as good as a real Promise and doesn't support + // chaining of "then" calls, so where we have complex dependencies we still + // get somewhat into "callback hell". TODO improve the correlator. + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'create', { + "type": "queue", + "name": qname, + "properties": declArgs, + "strict": true}) + ).then(function(response) { + if (config._start_replica) { + handleMethodResponse(response, true); // The second parameter prevents exiting. + // TODO test this stuff! + correlator.request( + getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct + ).then(function(objects) { + if (objects.habroker.length > 0) { + var habroker = objects.habroker[0]; + correlator.request( + invokeMethod(habroker, 'replicate', { + "broker": config._start_replica, + "queue": qname}) + ).then(handleMethodResponse); + } else { + messenger.stop(); + } + }); + } else { + handleMethodResponse(response); + } + }); + }); +}; + +var delQueue = function(args) { + if (args.length < 1) { + usage(); + } + + var qname = args[0]; + + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'delete', { + "type": "queue", + "name": qname, + "options": {"if_empty": config._if_empty, + "if_unused": config._if_unused}}) + ).then(handleMethodResponse); + }); +}; + +var snarf_header_args = function(args) { + if (args.length < 2) { + console.log("Invalid args to bind headers: need 'any'/'all' plus conditions"); + return false; + } + + var op = args[0]; + if (op === 'all' || op === 'any') { + kv = {}; + var bindings = Array.prototype.slice.apply(args, [1]); + for (var i = 0; i < bindings.length; i++) { + var binding = bindings[i]; + binding = binding.split(",")[0]; + binding = binding.split("="); + kv[binding[0]] = binding[1]; + } + kv['x-match'] = op; + return kv; + } else { + console.log("Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"); + return false; + } +}; + +var bind = function(args) { +console.log("bind"); +console.log(args); + + if (args.length < 2) { + usage(); + } + + var ename = args[0]; + var qname = args[1]; + var key = ''; + + if (args.length > 2) { + key = args[2]; + } + + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker'), + getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type. + ).then(function(objects) { + var exchanges = objects.exchange; + + var etype = ''; + for (var i = 0; i < exchanges.length; i++) { + var exchange = exchanges[i]._values; + if (exchange.name === ename) { + etype = exchange.type; + break; + } + } + + // type of the xchg determines the processing of the rest of + // argv. if it's an xml xchg, we want to find a file + // containing an x-query, and pass that. if it's a headers + // exchange, we need to pass either "any" or all, followed by a + // map containing key/value pairs. if neither of those, extra + // args are ignored. + var declArgs = {}; + if (etype === 'xml') { + + + } else if (etype === 'headers') { + declArgs = snarf_header_args(Array.prototype.slice.apply(args, [3])); + } +console.log(declArgs); + + if (typeof declArgs !== 'object') { + process.exit(1); + } + + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'create', { + "type": "binding", + "name": ename + '/' + qname + '/' + key, + "properties": declArgs, + "strict": true}) + ).then(handleMethodResponse); + }); + +/* + + ok = True + _args = {} + if not res: + pass + elif res.type == "xml": + # this checks/imports the -f arg + [ok, xquery] = snarf_xquery_args() + _args = { "xquery" : xquery } + else: + if res.type == "headers": + [ok, op, kv] = snarf_header_args(args[3:]) + _args = kv + _args["x-match"] = op + + if not ok: + sys.exit(1) + + self.broker.bind(ename, qname, key, _args) +*/ + +}; + +var unbind = function(args) { +console.log("unbind"); +console.log(args); + + if (args.length < 2) { + usage(); + } + + var ename = args[0]; + var qname = args[1]; + var key = ''; + + if (args.length > 2) { + key = args[2]; + } + + correlator.request( + // We invoke the CRUD methods on the broker object. + getObjects('org.apache.qpid.broker', 'broker') + ).then(function(objects) { + var broker = objects.broker[0]; + correlator.request( + invokeMethod(broker, 'delete', { + "type": "binding", + "name": ename + '/' + qname + '/' + key}) + ).then(handleMethodResponse); + }); +}; + +/** + * The following methods are "generic" create and delete methods to for arbitrary + * Management Objects e.g. Incoming, Outgoing, Domain, Topic, QueuePolicy, + * TopicPolicy etc. use --argument k1=v1 --argument k2=v2 --argument k3=v3 to + * pass arbitrary arguments as key/value pairs to the Object being created/deleted. + */ + +var createObject = function(type, name, args) { +console.log("createObject"); +console.log(type); +console.log(name); +console.log(args); + +}; + +var deleteObject = function(args) { +console.log("deleteObject"); +console.log(args); + +}; + + +/*********************** process command line options ************************/ + +var params = []; +var extra_arguments = []; +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log(_usage); + console.log(_description); + console.log(_options); + process.exit(0); + } + + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg === '-r' || arg === '--recursive') { + config._recursive = true; + } else if (arg === '--ignore-default') { + config._ignoreDefault = true; + } else if (arg === '--durable') { + config._durable = true; + } else if (arg === '--shared-groups') { + config._sharedMsgGroup = true; + } else if (arg === '--sequence') { + config._sequence = true; + } else if (arg === '--ive') { + config._ive = true; + } else if (arg === '--force') { + config._if_empty = false; + config._if_unused = false; + } else if (arg === '--force-if-not-empty') { + config._if_empty = false; + } else if (arg === '--force-if-used') { + config._if_unused = false; + } else if (arg === '--sequence') { + config._msgSequence = true; + } else if (arg.charAt(0) === '-') { + i++; + var val = args[i]; + if (arg === '-t' || arg === '--timeout') { + config._connTimeout = parseInt(val); + if (config._connTimeout === 0) { + config._connTimeout = null; + } + } else if (arg === '-b' || arg === '--broker' || arg === '-b' || arg === '--broker-addr') { + config._host = val; + if (config._host == null) { + config._host = 'localhost:5672'; + } + } else if (arg === '--alternate-exchange') { + config._altern_ex = val; + } else if (arg === '--replicate') { + if (!REPLICATE_LEVELS[val]) { + console.error("Invalid replication level " + val + ", should be one of 'none', 'configuration' or 'all'"); + } + config._replicate = val; + } else if (arg === '--file-count') { + config._fileCount = parseInt(val); + } else if (arg === '--file-size') { + config._fileSize = parseInt(val); + } else if (arg === '--max-queue-size') { + config._maxQueueSize = parseInt(val); + } else if (arg === '--max-queue-count') { + config._maxQueueCount = parseInt(val); + } else if (arg === '--limit-policy') { + config._limitPolicy = val; + } else if (arg === '--lvq-key') { + config._lvq_key = val; + } else if (arg === '--generate-queue-events') { + config._eventGeneration = parseInt(val); + } else if (arg === '--flow-stop-size') { + config._flowStopSize = parseInt(val); + } else if (arg === '--flow-resume-size') { + config._flowResumeSize = parseInt(val); + } else if (arg === '--flow-stop-count') { + config._flowStopCount = parseInt(val); + } else if (arg === '--flow-resume-count') { + config._flowResumeCount = parseInt(val); + } else if (arg === '--group-header') { + config._msgGroupHeader = val; + } else if (arg === '--argument') { + extra_arguments.push(val); + } else if (arg === '--start-replica') { + config._start_replica = val; + } else if (arg === '--f' || arg === '--file') { // TODO Won't work in node.js + config._file = val; + } else if (arg === '--show-property') { + config._list_properties = val; + } + } else { + params.push(arg); + } + } +} + +config._extra_arguments = extra_arguments; + +console.log("params"); +console.log(params); + +// The command only *actually* gets called when the QMF connection has actually +// been established so we wrap up the function we want to get called in a lambda. +var command = function() {overview();}; +if (params.length > 0) { + var cmd = params[0]; + var modifier = ''; + if (params.length > 1) { + modifier = params[1]; + } + + if (cmd === 'exchanges') { + if (config._recursive) { + command = function() {exchangeListRecurse(modifier);}; + } else { + command = function() {exchangeList(modifier);}; + } + } else if (cmd === 'queues') { + if (config._recursive) { + command = function() {queueListRecurse(modifier);}; + } else { + command = function() {queueList(modifier);}; + } + } else if (cmd === 'add') { + if (modifier === 'exchange') { + command = function() {addExchange(Array.prototype.slice.apply(params, [2]));}; + } else if (modifier === 'queue') { + command = function() {addQueue(Array.prototype.slice.apply(params, [2]));}; + } else if (params.length > 2) { + command = function() {createObject(modifier, params[2], config.getOptions());}; + } else { + usage(); + } + } else if (cmd === 'del') { + if (modifier === 'exchange') { + command = function() {delExchange(Array.prototype.slice.apply(params, [2]));}; + } else if (modifier === 'queue') { + command = function() {delQueue(Array.prototype.slice.apply(params, [2]));}; + } else if (params.length > 2) { + command = function() {deleteObject(modifier, params[2], {});}; + } else { + usage(); + } + } else if (cmd === 'bind') { + command = function() {bind(Array.prototype.slice.apply(params, [1]));}; + } else if (cmd === 'unbind') { + command = function() {unbind(Array.prototype.slice.apply(params, [1]));}; + } +} + +var onSubscription = function() { + command(); +}; + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/recv.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/recv.js b/examples/messenger/javascript/recv.js new file mode 100644 index 0000000..3d2b468 --- /dev/null +++ b/examples/messenger/javascript/recv.js @@ -0,0 +1,67 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var address = "amqp://~0.0.0.0"; +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +var pumpData = function() { + while (messenger.incoming()) { + var t = messenger.get(message); + + console.log("Address: " + message.getAddress()); + console.log("Subject: " + message.getSubject()); + + // body is the body as a native JavaScript Object, useful for most real cases. + //console.log("Content: " + message.body); + + // data is the body as a proton.Data Object, used in this case because + // format() returns exactly the same representation as recv.c + console.log("Content: " + message.data.format()); + + messenger.accept(t); + } +}; + +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: node recv.js <addr> (default " + address + ")"); + process.exit(0); + } + + address = args[0]; +} + +messenger.setIncomingWindow(1024); + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.start(); + +messenger.subscribe(address); +messenger.recv(); // Receive as many messages as messenger can buffer. + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a61e5f9c/examples/messenger/javascript/send.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/send.js b/examples/messenger/javascript/send.js new file mode 100644 index 0000000..77a605c --- /dev/null +++ b/examples/messenger/javascript/send.js @@ -0,0 +1,92 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Check if the environment is Node.js and if so import the required library. +if (typeof exports !== "undefined" && exports !== null) { + proton = require("qpid-proton"); +} + +var address = "amqp://0.0.0.0"; +var subject = "UK.WEATHER"; +var msgtext = "Hello World!"; +var tracker = null; +var running = true; + +var message = new proton.Message(); +var messenger = new proton.Messenger(); + +var pumpData = function() { + var status = messenger.status(tracker); + if (status != proton.Status.PENDING) { + if (running) { + messenger.stop(); + running = false; + } + } + + if (messenger.isStopped()) { + message.free(); + messenger.free(); + } +}; + +var args = process.argv.slice(2); +if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log("Usage: node send.js [options] [message]"); + console.log("Options:"); + console.log(" -a <addr> The target address [amqp[s]://domain[/name]] (default " + address + ")"); + console.log(" -s <subject> The message subject (default " + subject + ")"); + console.log("message A text string to send."); + process.exit(0); + } + + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg.charAt(0) === '-') { + i++; + var val = args[i]; + if (arg === '-a') { + address = val; + } else if (arg === '-s') { + subject = val; + } + } else { + msgtext = arg; + } + } +} + +console.log("Address: " + address); +console.log("Subject: " + subject); +console.log("Content: " + msgtext); + +messenger.on('error', function(error) {console.log(error);}); +messenger.on('work', pumpData); +messenger.setOutgoingWindow(1024); +messenger.start(); + +message.setAddress(address); +message.setSubject(subject); +message.body = msgtext; + +tracker = messenger.put(message); + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
