Repository: qpid-proton Updated Branches: refs/heads/master 37a0d6b07 -> cc791045e
PROTON-818: Reactor C soak test Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cc791045 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cc791045 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cc791045 Branch: refs/heads/master Commit: cc791045eb725ee60da4e3f2e62d7f35c9d82455 Parents: 37a0d6b Author: Clifford Jansen <[email protected]> Authored: Wed Feb 25 23:33:09 2015 -0800 Committer: Clifford Jansen <[email protected]> Committed: Wed Feb 25 23:33:09 2015 -0800 ---------------------------------------------------------------------- tests/python/proton_tests/common.py | 38 +++ tests/python/proton_tests/soak.py | 9 + tests/tools/apps/c/CMakeLists.txt | 8 +- tests/tools/apps/c/reactor-recv.c | 447 +++++++++++++++++++++++++++++++ tests/tools/apps/c/reactor-send.c | 389 +++++++++++++++++++++++++++ 5 files changed, 889 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/python/proton_tests/common.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py index 42cf7b1..1b8dbdb 100644 --- a/tests/python/proton_tests/common.py +++ b/tests/python/proton_tests/common.py @@ -448,3 +448,41 @@ class MessengerReceiverPython(MessengerReceiver): +class ReactorSenderC(MessengerSender): + def __init__(self): + MessengerSender.__init__(self) + self._command = ["reactor-send"] + +class ReactorSenderValgrind(ReactorSenderC): + """ Run the C sender under Valgrind + """ + def __init__(self, suppressions=None): + if "VALGRIND" not in os.environ: + raise Skipped("Skipping test - $VALGRIND not set.") + ReactorSenderC.__init__(self) + if not suppressions: + suppressions = os.path.join(os.path.dirname(__file__), + "valgrind.supp" ) + self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet", + "--trace-children=yes", "--leak-check=full", + "--suppressions=%s" % suppressions] + self._command + +class ReactorReceiverC(MessengerReceiver): + def __init__(self): + MessengerReceiver.__init__(self) + self._command = ["reactor-recv"] + +class ReactorReceiverValgrind(ReactorReceiverC): + """ Run the C receiver under Valgrind + """ + def __init__(self, suppressions=None): + if "VALGRIND" not in os.environ: + raise Skipped("Skipping test - $VALGRIND not set.") + ReactorReceiverC.__init__(self) + if not suppressions: + suppressions = os.path.join(os.path.dirname(__file__), + "valgrind.supp" ) + self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet", + "--trace-children=yes", "--leak-check=full", + "--suppressions=%s" % suppressions] + self._command + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/python/proton_tests/soak.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/soak.py b/tests/python/proton_tests/soak.py index 94ce488..9e5ceab 100644 --- a/tests/python/proton_tests/soak.py +++ b/tests/python/proton_tests/soak.py @@ -22,6 +22,8 @@ from common import Test, Skipped, free_tcp_ports, \ MessengerReceiverC, MessengerSenderC, \ MessengerReceiverValgrind, MessengerSenderValgrind, \ MessengerReceiverPython, MessengerSenderPython, \ + ReactorReceiverC, ReactorSenderC, \ + ReactorReceiverValgrind, ReactorSenderValgrind, \ isSSLPresent from proton import * @@ -354,3 +356,10 @@ class MessengerTests(AppTests): def test_star_topology_C_Python(self): self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC ) + + def test_oneway_reactor(self): + self._do_oneway_test(ReactorReceiverC(), ReactorSenderC()) + + def test_oneway_reactor_valgrind(self): + self.valgrind_test() + self._do_oneway_test(ReactorReceiverValgrind(), ReactorSenderValgrind()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/tools/apps/c/CMakeLists.txt b/tests/tools/apps/c/CMakeLists.txt index deafe24..9507c1f 100644 --- a/tests/tools/apps/c/CMakeLists.txt +++ b/tests/tools/apps/c/CMakeLists.txt @@ -33,17 +33,21 @@ endif (INTTYPES_AVAILABLE) add_executable(msgr-recv msgr-recv.c msgr-common.c) add_executable(msgr-send msgr-send.c msgr-common.c) +add_executable(reactor-recv reactor-recv.c msgr-common.c) +add_executable(reactor-send reactor-send.c msgr-common.c) target_link_libraries(msgr-recv qpid-proton) target_link_libraries(msgr-send qpid-proton) +target_link_libraries(reactor-recv qpid-proton) +target_link_libraries(reactor-send qpid-proton) set_target_properties ( - msgr-recv msgr-send + msgr-recv msgr-send reactor-recv reactor-send PROPERTIES COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS}" COMPILE_DEFINITIONS "${PLATFORM_DEFINITIONS}" ) if (BUILD_WITH_CXX) - set_source_files_properties (msgr-recv.c msgr-send.c msgr-common.c PROPERTIES LANGUAGE CXX) + set_source_files_properties (msgr-recv.c msgr-send.c msgr-common.c reactor-recv.c reactor-send.c PROPERTIES LANGUAGE CXX) endif (BUILD_WITH_CXX) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/reactor-recv.c ---------------------------------------------------------------------- diff --git a/tests/tools/apps/c/reactor-recv.c b/tests/tools/apps/c/reactor-recv.c new file mode 100644 index 0000000..d7b92bc --- /dev/null +++ b/tests/tools/apps/c/reactor-recv.c @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Implements a subset of msgr-recv.c using reactor events. + */ + +#include "proton/message.h" +#include "proton/error.h" +#include "proton/types.h" +#include "proton/reactor.h" +#include "proton/handlers.h" +#include "proton/engine.h" +#include "proton/url.h" +#include "msgr-common.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <ctype.h> + +// The exact struct from msgr-recv, mostly fallow. +typedef struct { + Addresses_t subscriptions; + uint64_t msg_count; + int recv_count; + int incoming_window; + int timeout; // seconds + unsigned int report_interval; // in seconds + int outgoing_window; + int reply; + const char *name; + const char *ready_text; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: reactor-recv [OPTIONS] \n" + " -c # \tNumber of messages to receive before exiting [0=forever]\n" + " -R \tSend reply if 'reply-to' present\n" + " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n" + " -X <text> \tPrint '<text>\\n' to stdout after all subscriptions are created\n" + ); + exit(rc); +} + + +// Global context for this process +typedef struct { + Options_t *opts; + Statistics_t *stats; + uint64_t sent; + uint64_t received; + pn_message_t *message; + pn_acceptor_t *acceptor; + char *encoded_data; + size_t encoded_data_size; + int connections; + pn_list_t *active_connections; + bool shutting_down; + pn_handler_t *listener_handler; + int quiesce_count; +} global_context_t; + +// Per connection context +typedef struct { + global_context_t *global; + int connection_id; + pn_link_t *recv_link; + pn_link_t *reply_link; +} connection_context_t; + + +static char *ensure_buffer(char *buf, size_t needed, size_t *actual) +{ + // Make room for the largest message seen so far, plus extra for slight changes in metadata content + if (needed + 1024 <= *actual) + return buf; + needed += 2048; + buf = (char *) realloc(buf, needed); + *actual = buf ? needed : 0; + return buf; +} + +void global_shutdown(global_context_t *gc) +{ + if (gc->shutting_down) return; + gc->shutting_down = true; + pn_acceptor_close(gc->acceptor); + size_t n = pn_list_size(gc->active_connections); + for (size_t i = 0; i < n; i++) { + pn_connection_t *conn = (pn_connection_t *) pn_list_get(gc->active_connections, i); + if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { + pn_connection_close(conn); + } + } +} + +connection_context_t *connection_context(pn_handler_t *h) +{ + connection_context_t *p = (connection_context_t *) pn_handler_mem(h); + return p; +} + +void connection_context_init(connection_context_t *cc, global_context_t *gc) +{ + cc->global = gc; + pn_incref(gc->listener_handler); + cc->connection_id = gc->connections++; + cc->recv_link = 0; + cc->reply_link = 0; +} + +void connection_cleanup(pn_handler_t *h) +{ + connection_context_t *cc = connection_context(h); + // Undo pn_incref() from connection_context_init() + pn_decref(cc->global->listener_handler); +} + +void connection_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + connection_context_t *cc = connection_context(h); + bool replying = cc->global->opts->reply; + + switch (type) { + case PN_LINK_REMOTE_OPEN: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_is_receiver(link)) { + check(cc->recv_link == NULL, "Multiple incomming links on one connection"); + cc->recv_link = link; + pn_connection_t *conn = pn_event_connection(event); + pn_list_add(cc->global->active_connections, conn); + if (cc->global->shutting_down) { + pn_connection_close(conn); + break; + } + if (replying) { + // Set up a reply link and defer granting credit to the incoming link + pn_connection_t *conn = pn_session_connection(pn_link_session(link)); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + char name[100]; // prefer a multiplatform uuid generator + sprintf(name, "reply_sender_%d", cc->connection_id); + cc->reply_link = pn_sender(ssn, name); + pn_link_open(cc->reply_link); + } + else { + pn_flowcontroller_t *fc = pn_flowcontroller(1024); + pn_handler_add(h, fc); + pn_decref(fc); + } + } + } + break; + case PN_LINK_FLOW: + { + if (replying) { + pn_link_t *reply_link = pn_event_link(event); + // pn_flowcontroller handles the non-reply case + check(reply_link == cc->reply_link, "internal error"); + + // Grant the sender as much credit as just given to us for replies + int delta = pn_link_credit(reply_link) - pn_link_credit(cc->recv_link); + if (delta > 0) + pn_link_flow(cc->recv_link, delta); + } + } + break; + case PN_DELIVERY: + { + pn_link_t *recv_link = pn_event_link(event); + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) { + if (cc->global->received == 0) statistics_start(cc->global->stats); + + size_t encoded_size = pn_delivery_pending(dlv); + cc->global->encoded_data = ensure_buffer(cc->global->encoded_data, encoded_size, + &cc->global->encoded_data_size); + check(cc->global->encoded_data, "decoding buffer realloc failure"); + + ssize_t n = pn_link_recv(recv_link, cc->global->encoded_data, encoded_size); + check(n == (ssize_t) encoded_size, "message data read fail"); + pn_message_t *msg = cc->global->message; + int err = pn_message_decode(msg, cc->global->encoded_data, n); + check(err == 0, "message decode error"); + cc->global->received++; + pn_delivery_settle(dlv); + statistics_msg_received(cc->global->stats, msg); + + if (replying) { + const char *reply_addr = pn_message_get_reply_to(msg); + if (reply_addr) { + pn_link_t *rl = cc->reply_link; + check(pn_link_credit(rl) > 0, "message received without corresponding reply credit"); + LOG("Replying to: %s\n", reply_addr ); + + pn_message_set_address(msg, reply_addr); + pn_message_set_creation_time(msg, msgr_now()); + + char tag[8]; + void *ptr = &tag; + *((uint64_t *) ptr) = cc->global->sent; + pn_delivery_t *dlv = pn_delivery(rl, pn_dtag(tag, 8)); + size_t size = cc->global->encoded_data_size; + int err = pn_message_encode(msg, cc->global->encoded_data, &size); + check(err == 0, "message encoding error"); + pn_link_send(rl, cc->global->encoded_data, size); + pn_delivery_settle(dlv); + + cc->global->sent++; + } + } + } + if (cc->global->received >= cc->global->opts->msg_count) { + global_shutdown(cc->global); + } + } + break; + case PN_CONNECTION_UNBOUND: + { + pn_connection_t *conn = pn_event_connection(event); + pn_list_remove(cc->global->active_connections, conn); + pn_connection_release(conn); + } + break; + default: + break; + } +} + +pn_handler_t *connection_handler(global_context_t *gc) +{ + pn_handler_t *h = pn_handler_new(connection_dispatch, sizeof(connection_context_t), connection_cleanup); + connection_context_t *cc = connection_context(h); + connection_context_init(cc, gc); + return h; +} + + +void start_listener(global_context_t *gc, pn_reactor_t *reactor) +{ + check(gc->opts->subscriptions.count > 0, "no listening address"); + pn_url_t *listen_url = pn_url_parse(gc->opts->subscriptions.addresses[0]); + const char *host = pn_url_get_host(listen_url); + const char *port = pn_url_get_port(listen_url); + if (port == 0 || strlen(port) == 0) + port = "5672"; + if (host == 0 || strlen(host) == 0) + host = "0.0.0.0"; + if (*host == '~') host++; + gc->acceptor = pn_reactor_acceptor(reactor, host, port, NULL); + check(gc->acceptor, "acceptor creation failed"); + pn_url_free(listen_url); +} + +void global_context_init(global_context_t *gc, Options_t *o, Statistics_t *s) +{ + gc->opts = o; + gc->stats = s; + gc->sent = 0; + gc->received = 0; + gc->encoded_data_size = 0; + gc->encoded_data = 0; + gc->message = pn_message(); + check(gc->message, "failed to allocate a message"); + gc->connections = 0; + gc->active_connections = pn_list(PN_OBJECT, 0); + gc->acceptor = 0; + gc->shutting_down = false; + gc->listener_handler = 0; + gc->quiesce_count = 0; +} + +global_context_t *global_context(pn_handler_t *h) +{ + return (global_context_t *) pn_handler_mem(h); +} + +void listener_cleanup(pn_handler_t *h) +{ + global_context_t *gc = global_context(h); + pn_message_free(gc->message); + free(gc->encoded_data); + pn_free(gc->active_connections); +} + +void listener_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + global_context_t *gc = global_context(h); + if (type == PN_REACTOR_QUIESCED) + gc->quiesce_count++; + else + gc->quiesce_count = 0; + + switch (type) { + case PN_CONNECTION_INIT: + { + pn_connection_t *connection = pn_event_connection(event); + + // New incoming connection on listener socket. Give each a separate handler. + pn_handler_t *ch = connection_handler(gc); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(ch, handshaker); + pn_decref(handshaker); + pn_record_t *record = pn_connection_attachments(connection); + pn_record_set_handler(record, ch); + pn_decref(ch); + } + break; + case PN_REACTOR_QUIESCED: + { + // Two quiesce in a row means we have been idle for a timout period + if (gc->opts->timeout != -1 && gc->quiesce_count > 1) + global_shutdown(gc); + } + break; + case PN_REACTOR_INIT: + { + pn_reactor_t *reactor = pn_event_reactor(event); + start_listener(gc, reactor); + + // hack to let test scripts know when the receivers are ready (so + // that the senders may be started) + if (gc->opts->ready_text) { + fprintf(stdout, "%s\n", gc->opts->ready_text); + fflush(stdout); + } + if (gc->opts->timeout != -1) + pn_reactor_set_timeout(pn_event_reactor(event), gc->opts->timeout); + } + break; + case PN_REACTOR_FINAL: + { + if (gc->received == 0) statistics_start(gc->stats); + statistics_report(gc->stats, gc->sent, gc->received); + } + break; + default: + break; + } +} + +pn_handler_t *listener_handler(Options_t *opts, Statistics_t *stats) +{ + pn_handler_t *h = pn_handler_new(listener_dispatch, sizeof(global_context_t), listener_cleanup); + global_context_t *gc = global_context(h); + global_context_init(gc, opts, stats); + gc->listener_handler = h; + return h; +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->recv_count = -1; + opts->timeout = -1; + addresses_init( &opts->subscriptions); + + while ((c = getopt(argc, argv, + "a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) { + switch (c) { + case 'a': + { + // TODO: multiple addresses? + char *comma = strchr(optarg, ','); + check(comma == 0, "multiple addresses not implemented"); + check(opts->subscriptions.count == 0, "multiple addresses not implemented"); + addresses_merge( &opts->subscriptions, optarg ); + } + break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'R': opts->reply = 1; break; + case 'V': enable_logging(); break; + case 'X': opts->ready_text = optarg; break; + default: + usage(1); + } + } + + if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions, + "amqp://~0.0.0.0" ); +} + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + parse_options( argc, argv, &opts ); + pn_reactor_t *reactor = pn_reactor(); + + // set up default handlers for our reactor + pn_handler_t *root = pn_reactor_get_handler(reactor); + pn_handler_t *lh = listener_handler(&opts, &stats); + pn_handler_add(root, lh); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(root, handshaker); + + // Omit decrefs else segfault. Not sure why they are necessary + // to keep valgrind happy for the connection_handler, but not here. + // pn_decref(handshaker); + // pn_decref(lh); + + pn_reactor_run(reactor); + pn_reactor_free(reactor); + + addresses_free( &opts.subscriptions ); + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/reactor-send.c ---------------------------------------------------------------------- diff --git a/tests/tools/apps/c/reactor-send.c b/tests/tools/apps/c/reactor-send.c new file mode 100644 index 0000000..271efe3 --- /dev/null +++ b/tests/tools/apps/c/reactor-send.c @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Implements a subset of msgr-send.c using reactor events. + */ + +#include "proton/message.h" +#include "proton/error.h" +#include "proton/types.h" +#include "proton/reactor.h" +#include "proton/handlers.h" +#include "proton/engine.h" +#include "proton/url.h" +#include "msgr-common.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <ctype.h> + + +typedef struct { + Addresses_t targets; + uint64_t msg_count; + uint32_t msg_size; // of body + uint32_t send_batch; + int outgoing_window; + unsigned int report_interval; // in seconds + //Addresses_t subscriptions; + //Addresses_t reply_tos; + int get_replies; + int timeout; // in seconds + int incoming_window; + int recv_count; + const char *name; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: reactor-send [OPTIONS] \n" + " -a <addr> \tThe target address [amqp[s]://domain[/name]]\n" + " -c # \tNumber of messages to send before exiting [0=forever]\n" + " -b # \tSize of message body in bytes [1024]\n" + " -R \tWait for a reply to each sent message\n" + " -V \tEnable debug logging\n" + ); + exit(rc); +} + + +typedef struct { + Options_t *opts; + Statistics_t *stats; + uint64_t sent; + uint64_t received; + pn_message_t *message; + pn_message_t *reply_message; + pn_atom_t id; + char *encoded_data; + size_t encoded_data_size; + pn_url_t *send_url; + pn_string_t *hostname; + pn_string_t *container_id; +} sender_context_t; + +void sender_context_init(sender_context_t *sc, Options_t *opts, Statistics_t *stats) +{ + sc->opts = opts; + sc->stats = stats; + sc->sent = 0; + sc->received = 0; + sc->id.type = PN_ULONG; + sc->reply_message = 0; + // 4096 extra bytes should easily cover the message metadata + sc->encoded_data_size = sc->opts->msg_size + 4096; + sc->encoded_data = (char *)calloc(1, sc->encoded_data_size); + check(sc->encoded_data, "failed to allocate encoding buffer"); + sc->container_id = pn_string("reactor-send"); // prefer uuid-like name + + sc->reply_message = (sc->opts->get_replies) ? pn_message() : 0; + sc->message = pn_message(); + check(sc->message, "failed to allocate a message"); + pn_string_t *rpto = pn_string("amqp://"); + pn_string_addf(rpto, "%s", pn_string_get(sc->container_id)); + pn_message_set_reply_to(sc->message, pn_string_get(rpto)); + pn_free(rpto); + pn_data_t *body = pn_message_body(sc->message); + // borrow the encoding buffer this one time + char *data = sc->encoded_data; + pn_data_put_binary(body, pn_bytes(sc->opts->msg_size, data)); + + check(sc->opts->targets.count > 0, "no specified address"); + sc->send_url = pn_url_parse(sc->opts->targets.addresses[0]); + const char *host = pn_url_get_host(sc->send_url); + const char *port = pn_url_get_port(sc->send_url); + sc->hostname = pn_string(host); + if (port && strlen(port)) + pn_string_addf(sc->hostname, ":%s", port); +} + +sender_context_t *sender_context(pn_handler_t *h) +{ + return (sender_context_t *) pn_handler_mem(h); +} + +void sender_cleanup(pn_handler_t *h) +{ + sender_context_t *sc = sender_context(h); + pn_message_free(sc->message); + pn_message_free(sc->reply_message); + pn_url_free(sc->send_url); + pn_free(sc->hostname); + pn_free(sc->container_id); + free(sc->encoded_data); +} + +pn_handler_t *replyto_handler(sender_context_t *sc); + +void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + sender_context_t *sc = sender_context(h); + + switch (type) { + case PN_CONNECTION_INIT: + { + pn_connection_t *conn = pn_event_connection(event); + pn_connection_set_container(conn, pn_string_get(sc->container_id)); + pn_connection_set_hostname(conn, pn_string_get(sc->hostname)); + pn_connection_open(conn); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + pn_link_t *snd = pn_sender(ssn, "sender"); + const char *path = pn_url_get_path(sc->send_url); + if (path && strlen(path)) { + pn_terminus_set_address(pn_link_target(snd), path); + pn_terminus_set_address(pn_link_source(snd), path); + } + pn_link_open(snd); + } + break; + case PN_LINK_FLOW: + { + pn_link_t *snd = pn_event_link(event); + while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) { + if (sc->sent == 0) + statistics_start(sc->stats); + + char tag[8]; + void *ptr = &tag; + *((uint64_t *) ptr) = sc->sent; + pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8)); + + // setup the message to send + pn_message_t *msg = sc->message; + pn_message_set_address(msg, sc->opts->targets.addresses[0]); + sc->id.u.as_ulong = sc->sent; + pn_message_set_correlation_id(msg, sc->id); + pn_message_set_creation_time(msg, msgr_now()); + + size_t size = sc->encoded_data_size; + int err = pn_message_encode(msg, sc->encoded_data, &size); + check(err == 0, "message encoding error"); + pn_link_send(snd, sc->encoded_data, size); + pn_delivery_settle(dlv); + sc->sent++; + } + if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) { + pn_link_close(snd); + pn_connection_t *conn = pn_event_connection(event); + pn_connection_close(conn); + } + } + break; + case PN_LINK_INIT: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_is_receiver(link)) { + // Response messages link. Could manage credit and deliveries in this handler but + // a dedicated handler also works. + pn_handler_t *replyto = replyto_handler(sc); + pn_flowcontroller_t *fc = pn_flowcontroller(1024); + pn_handler_add(replyto, fc); + pn_decref(fc); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(replyto, handshaker); + pn_decref(handshaker); + pn_record_t *record = pn_link_attachments(link); + pn_record_set_handler(record, replyto); + pn_decref(replyto); + } + } + break; + case PN_CONNECTION_LOCAL_CLOSE: + { + statistics_report(sc->stats, sc->sent, sc->received); + } + break; + default: + break; + } +} + +pn_handler_t *sender_handler(Options_t *opts, Statistics_t *stats) +{ + pn_handler_t *h = pn_handler_new(sender_dispatch, sizeof(sender_context_t), sender_cleanup); + sender_context_t *sc = sender_context(h); + sender_context_init(sc, opts, stats); + return h; +} + +sender_context_t *replyto_sender_context(pn_handler_t *h) +{ + sender_context_t **p = (sender_context_t **) pn_handler_mem(h); + return *p; +} + +void replyto_cleanup(pn_handler_t *h) +{} + +void replyto_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) { + sender_context_t *sc = replyto_sender_context(h); + + switch (type) { + case PN_DELIVERY: + { + check(sc->opts->get_replies, "Unexpected reply message"); + pn_link_t *recv_link = pn_event_link(event); + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) { + size_t encoded_size = pn_delivery_pending(dlv); + check(encoded_size <= sc->encoded_data_size, "decoding buffer too small"); + ssize_t n = pn_link_recv(recv_link, sc->encoded_data, encoded_size); + check(n == (ssize_t)encoded_size, "read fail on reply link"); + pn_message_t *msg = sc->reply_message; + int err = pn_message_decode(msg, sc->encoded_data, n); + check(err == 0, "message decode error"); + statistics_msg_received(sc->stats, msg); + sc->received++; + pn_delivery_settle(dlv); + } + if (sc->received == sc->opts->msg_count) { + pn_link_close(recv_link); + pn_connection_t *conn = pn_event_connection(event); + pn_connection_close(conn); + } + } + break; + default: + break; + } +} + +pn_handler_t *replyto_handler(sender_context_t *sc) +{ + pn_handler_t *h = pn_handler_new(replyto_dispatch, sizeof(sender_context_t *), replyto_cleanup); + sender_context_t **p = (sender_context_t **) pn_handler_mem(h); + *p = sc; + return h; +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->msg_size = 1024; + opts->send_batch = 1024; + opts->timeout = -1; + opts->recv_count = -1; + addresses_init(&opts->targets); + + while ((c = getopt(argc, argv, + "a:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) { + switch(c) { + case 'a': + { + // TODO: multiple addresses? To keep tests happy, accept multiple for now, + // but ignore all but the first. + addresses_merge( &opts->targets, optarg ); + } + break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'b': + if (sscanf( optarg, "%u", &opts->msg_size ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'p': + if (sscanf( optarg, "%u", &opts->send_batch ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'w': + if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'e': + if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'R': opts->get_replies = 1; break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'W': + if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'B': + if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'V': enable_logging(); break; + case 'N': opts->name = optarg; break; + case 'T': opts->ca_db = optarg; break; + case 'C': opts->certificate = optarg; break; + case 'K': opts->privatekey = optarg; break; + case 'P': parse_password( optarg, &opts->password ); break; + + default: + usage(1); + } + } + + // default target if none specified + if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" ); +} + + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + parse_options( argc, argv, &opts ); + + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *sh = sender_handler(&opts, &stats); + pn_handler_add(sh, pn_handshaker()); + pn_reactor_connection(reactor, sh); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + + pn_handler_free(sh); + addresses_free(&opts.targets); + return 0; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
