Repository: qpid-proton
Updated Branches:
  refs/heads/master 37a0d6b07 -> cc791045e


PROTON-818: Reactor C soak test


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cc791045
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cc791045
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cc791045

Branch: refs/heads/master
Commit: cc791045eb725ee60da4e3f2e62d7f35c9d82455
Parents: 37a0d6b
Author: Clifford Jansen <[email protected]>
Authored: Wed Feb 25 23:33:09 2015 -0800
Committer: Clifford Jansen <[email protected]>
Committed: Wed Feb 25 23:33:09 2015 -0800

----------------------------------------------------------------------
 tests/python/proton_tests/common.py |  38 +++
 tests/python/proton_tests/soak.py   |   9 +
 tests/tools/apps/c/CMakeLists.txt   |   8 +-
 tests/tools/apps/c/reactor-recv.c   | 447 +++++++++++++++++++++++++++++++
 tests/tools/apps/c/reactor-send.c   | 389 +++++++++++++++++++++++++++
 5 files changed, 889 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/python/proton_tests/common.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/common.py 
b/tests/python/proton_tests/common.py
index 42cf7b1..1b8dbdb 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -448,3 +448,41 @@ class MessengerReceiverPython(MessengerReceiver):
 
 
 
+class ReactorSenderC(MessengerSender):
+    def __init__(self):
+        MessengerSender.__init__(self)
+        self._command = ["reactor-send"]
+
+class ReactorSenderValgrind(ReactorSenderC):
+    """ Run the C sender under Valgrind
+    """
+    def __init__(self, suppressions=None):
+        if "VALGRIND" not in os.environ:
+            raise Skipped("Skipping test - $VALGRIND not set.")
+        ReactorSenderC.__init__(self)
+        if not suppressions:
+            suppressions = os.path.join(os.path.dirname(__file__),
+                                        "valgrind.supp" )
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", 
"--quiet",
+                         "--trace-children=yes", "--leak-check=full",
+                         "--suppressions=%s" % suppressions] + self._command
+
+class ReactorReceiverC(MessengerReceiver):
+    def __init__(self):
+        MessengerReceiver.__init__(self)
+        self._command = ["reactor-recv"]
+
+class ReactorReceiverValgrind(ReactorReceiverC):
+    """ Run the C receiver under Valgrind
+    """
+    def __init__(self, suppressions=None):
+        if "VALGRIND" not in os.environ:
+            raise Skipped("Skipping test - $VALGRIND not set.")
+        ReactorReceiverC.__init__(self)
+        if not suppressions:
+            suppressions = os.path.join(os.path.dirname(__file__),
+                                        "valgrind.supp" )
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", 
"--quiet",
+                         "--trace-children=yes", "--leak-check=full",
+                         "--suppressions=%s" % suppressions] + self._command
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/python/proton_tests/soak.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/soak.py 
b/tests/python/proton_tests/soak.py
index 94ce488..9e5ceab 100644
--- a/tests/python/proton_tests/soak.py
+++ b/tests/python/proton_tests/soak.py
@@ -22,6 +22,8 @@ from common import Test, Skipped, free_tcp_ports, \
     MessengerReceiverC, MessengerSenderC, \
     MessengerReceiverValgrind, MessengerSenderValgrind, \
     MessengerReceiverPython, MessengerSenderPython, \
+    ReactorReceiverC, ReactorSenderC, \
+    ReactorReceiverValgrind, ReactorSenderValgrind, \
     isSSLPresent
 from proton import *
 
@@ -354,3 +356,10 @@ class MessengerTests(AppTests):
 
     def test_star_topology_C_Python(self):
         self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC 
)
+
+    def test_oneway_reactor(self):
+        self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())
+
+    def test_oneway_reactor_valgrind(self):
+        self.valgrind_test()
+        self._do_oneway_test(ReactorReceiverValgrind(), 
ReactorSenderValgrind())

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/tools/apps/c/CMakeLists.txt 
b/tests/tools/apps/c/CMakeLists.txt
index deafe24..9507c1f 100644
--- a/tests/tools/apps/c/CMakeLists.txt
+++ b/tests/tools/apps/c/CMakeLists.txt
@@ -33,17 +33,21 @@ endif (INTTYPES_AVAILABLE)
 
 add_executable(msgr-recv msgr-recv.c msgr-common.c)
 add_executable(msgr-send msgr-send.c msgr-common.c)
+add_executable(reactor-recv reactor-recv.c msgr-common.c)
+add_executable(reactor-send reactor-send.c msgr-common.c)
 
 target_link_libraries(msgr-recv qpid-proton)
 target_link_libraries(msgr-send qpid-proton)
+target_link_libraries(reactor-recv qpid-proton)
+target_link_libraries(reactor-send qpid-proton)
 
 set_target_properties (
-  msgr-recv msgr-send
+  msgr-recv msgr-send reactor-recv reactor-send
   PROPERTIES
   COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS}"
   COMPILE_DEFINITIONS "${PLATFORM_DEFINITIONS}"
 )
 
 if (BUILD_WITH_CXX)
-  set_source_files_properties (msgr-recv.c msgr-send.c msgr-common.c 
PROPERTIES LANGUAGE CXX)
+  set_source_files_properties (msgr-recv.c msgr-send.c msgr-common.c 
reactor-recv.c reactor-send.c PROPERTIES LANGUAGE CXX)
 endif (BUILD_WITH_CXX)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/reactor-recv.c
----------------------------------------------------------------------
diff --git a/tests/tools/apps/c/reactor-recv.c 
b/tests/tools/apps/c/reactor-recv.c
new file mode 100644
index 0000000..d7b92bc
--- /dev/null
+++ b/tests/tools/apps/c/reactor-recv.c
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Implements a subset of msgr-recv.c using reactor events.
+ */
+
+#include "proton/message.h"
+#include "proton/error.h"
+#include "proton/types.h"
+#include "proton/reactor.h"
+#include "proton/handlers.h"
+#include "proton/engine.h"
+#include "proton/url.h"
+#include "msgr-common.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+
+// The exact struct from msgr-recv, mostly fallow.
+typedef struct {
+  Addresses_t subscriptions;
+  uint64_t msg_count;
+  int recv_count;
+  int incoming_window;
+  int timeout;  // seconds
+  unsigned int report_interval;  // in seconds
+  int   outgoing_window;
+  int   reply;
+  const char *name;
+  const char *ready_text;
+  char *certificate;
+  char *privatekey;   // used to sign certificate
+  char *password;     // for private key file
+  char *ca_db;        // trusted CA database
+} Options_t;
+
+
+static void usage(int rc)
+{
+    printf("Usage: reactor-recv [OPTIONS] \n"
+           " -c # \tNumber of messages to receive before exiting [0=forever]\n"
+           " -R \tSend reply if 'reply-to' present\n"
+           " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n"
+           " -X <text> \tPrint '<text>\\n' to stdout after all subscriptions 
are created\n"
+           );
+    exit(rc);
+}
+
+
+// Global context for this process
+typedef struct {
+  Options_t *opts;
+  Statistics_t *stats;
+  uint64_t sent;
+  uint64_t received;
+  pn_message_t *message;
+  pn_acceptor_t *acceptor;
+  char *encoded_data;
+  size_t encoded_data_size;
+  int connections;
+  pn_list_t *active_connections;
+  bool shutting_down;
+  pn_handler_t *listener_handler;
+  int quiesce_count;
+} global_context_t;
+
+// Per connection context
+typedef struct {
+  global_context_t *global;
+  int connection_id;
+  pn_link_t *recv_link;
+  pn_link_t *reply_link;
+} connection_context_t;
+
+
+static char *ensure_buffer(char *buf, size_t needed, size_t *actual)
+{
+  // Make room for the largest message seen so far, plus extra for slight 
changes in metadata content
+  if (needed + 1024 <= *actual)
+    return buf;
+  needed += 2048;
+  buf = (char *) realloc(buf, needed);
+  *actual = buf ? needed : 0;
+  return buf;
+}
+
+void global_shutdown(global_context_t *gc)
+{
+  if (gc->shutting_down) return;
+  gc->shutting_down = true;
+  pn_acceptor_close(gc->acceptor);
+  size_t n = pn_list_size(gc->active_connections);
+  for (size_t i = 0; i < n; i++) {
+    pn_connection_t *conn = (pn_connection_t *) 
pn_list_get(gc->active_connections, i);
+    if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
+      pn_connection_close(conn);
+    }
+  }
+}
+
+connection_context_t *connection_context(pn_handler_t *h)
+{
+  connection_context_t *p = (connection_context_t *) pn_handler_mem(h);
+  return p;
+}
+
+void connection_context_init(connection_context_t *cc, global_context_t *gc)
+{
+  cc->global = gc;
+  pn_incref(gc->listener_handler);
+  cc->connection_id = gc->connections++;
+  cc->recv_link = 0;
+  cc->reply_link = 0;
+}
+
+void connection_cleanup(pn_handler_t *h)
+{
+  connection_context_t *cc = connection_context(h);
+  // Undo pn_incref() from connection_context_init()
+  pn_decref(cc->global->listener_handler);
+}
+
+void connection_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t 
type)
+{
+  connection_context_t *cc = connection_context(h);
+  bool replying = cc->global->opts->reply;
+
+  switch (type) {
+  case PN_LINK_REMOTE_OPEN:
+    {
+      pn_link_t *link = pn_event_link(event);
+      if (pn_link_is_receiver(link)) {
+        check(cc->recv_link == NULL, "Multiple incomming links on one 
connection");
+        cc->recv_link = link;
+        pn_connection_t *conn = pn_event_connection(event);
+        pn_list_add(cc->global->active_connections, conn);
+        if (cc->global->shutting_down) {
+          pn_connection_close(conn);
+          break;
+        }
+        if (replying) {
+          // Set up a reply link and defer granting credit to the incoming link
+          pn_connection_t *conn = pn_session_connection(pn_link_session(link));
+          pn_session_t *ssn = pn_session(conn);
+          pn_session_open(ssn);
+          char name[100]; // prefer a multiplatform uuid generator
+          sprintf(name, "reply_sender_%d", cc->connection_id);
+          cc->reply_link = pn_sender(ssn, name);
+          pn_link_open(cc->reply_link);
+        }
+        else {
+          pn_flowcontroller_t *fc = pn_flowcontroller(1024);
+          pn_handler_add(h, fc);
+          pn_decref(fc);
+        }
+      }
+    }
+    break;
+  case PN_LINK_FLOW:
+    {
+      if (replying) {
+        pn_link_t *reply_link = pn_event_link(event);
+        // pn_flowcontroller handles the non-reply case
+        check(reply_link == cc->reply_link, "internal error");
+
+        // Grant the sender as much credit as just given to us for replies
+        int delta = pn_link_credit(reply_link) - pn_link_credit(cc->recv_link);
+        if (delta > 0)
+          pn_link_flow(cc->recv_link, delta);
+      }
+    }
+    break;
+  case PN_DELIVERY:
+    {
+      pn_link_t *recv_link = pn_event_link(event);
+      pn_delivery_t *dlv = pn_event_delivery(event);
+      if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) {
+        if (cc->global->received == 0) statistics_start(cc->global->stats);
+
+        size_t encoded_size = pn_delivery_pending(dlv);
+        cc->global->encoded_data = ensure_buffer(cc->global->encoded_data, 
encoded_size,
+                                                 
&cc->global->encoded_data_size);
+        check(cc->global->encoded_data, "decoding buffer realloc failure");
+
+        ssize_t n = pn_link_recv(recv_link, cc->global->encoded_data, 
encoded_size);
+        check(n == (ssize_t) encoded_size, "message data read fail");
+        pn_message_t *msg = cc->global->message;
+        int err = pn_message_decode(msg, cc->global->encoded_data, n);
+        check(err == 0, "message decode error");
+        cc->global->received++;
+        pn_delivery_settle(dlv);
+        statistics_msg_received(cc->global->stats, msg);
+
+        if (replying) {
+          const char *reply_addr = pn_message_get_reply_to(msg);
+          if (reply_addr) {
+            pn_link_t *rl = cc->reply_link;
+            check(pn_link_credit(rl) > 0, "message received without 
corresponding reply credit");
+            LOG("Replying to: %s\n", reply_addr );
+
+            pn_message_set_address(msg, reply_addr);
+            pn_message_set_creation_time(msg, msgr_now());
+
+            char tag[8];
+            void *ptr = &tag;
+            *((uint64_t *) ptr) = cc->global->sent;
+            pn_delivery_t *dlv = pn_delivery(rl, pn_dtag(tag, 8));
+            size_t size = cc->global->encoded_data_size;
+            int err = pn_message_encode(msg, cc->global->encoded_data, &size);
+            check(err == 0, "message encoding error");
+            pn_link_send(rl, cc->global->encoded_data, size);
+            pn_delivery_settle(dlv);
+
+            cc->global->sent++;
+          }
+        }
+      }
+      if (cc->global->received >= cc->global->opts->msg_count) {
+        global_shutdown(cc->global);
+      }
+    }
+    break;
+  case PN_CONNECTION_UNBOUND:
+    {
+      pn_connection_t *conn = pn_event_connection(event);
+      pn_list_remove(cc->global->active_connections, conn);
+      pn_connection_release(conn);
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_handler_t *connection_handler(global_context_t *gc)
+{
+  pn_handler_t *h = pn_handler_new(connection_dispatch, 
sizeof(connection_context_t), connection_cleanup);
+  connection_context_t *cc = connection_context(h);
+  connection_context_init(cc, gc);
+  return h;
+}
+
+
+void start_listener(global_context_t *gc, pn_reactor_t *reactor)
+{
+  check(gc->opts->subscriptions.count > 0, "no listening address");
+  pn_url_t *listen_url = pn_url_parse(gc->opts->subscriptions.addresses[0]);
+  const char *host = pn_url_get_host(listen_url);
+  const char *port = pn_url_get_port(listen_url);
+  if (port == 0 || strlen(port) == 0)
+    port = "5672";
+  if (host == 0 || strlen(host) == 0)
+    host = "0.0.0.0";
+  if (*host == '~') host++;
+  gc->acceptor = pn_reactor_acceptor(reactor, host, port, NULL);
+  check(gc->acceptor, "acceptor creation failed");
+  pn_url_free(listen_url);
+}
+
+void global_context_init(global_context_t *gc, Options_t *o, Statistics_t *s)
+{
+  gc->opts = o;
+  gc->stats = s;
+  gc->sent = 0;
+  gc->received = 0;
+  gc->encoded_data_size = 0;
+  gc->encoded_data = 0;
+  gc->message = pn_message();
+  check(gc->message, "failed to allocate a message");
+  gc->connections = 0;
+  gc->active_connections = pn_list(PN_OBJECT, 0);
+  gc->acceptor = 0;
+  gc->shutting_down = false;
+  gc->listener_handler = 0;
+  gc->quiesce_count = 0;
+}
+
+global_context_t *global_context(pn_handler_t *h)
+{
+  return (global_context_t *) pn_handler_mem(h);
+}
+
+void listener_cleanup(pn_handler_t *h)
+{
+  global_context_t *gc = global_context(h);
+  pn_message_free(gc->message);
+  free(gc->encoded_data);
+  pn_free(gc->active_connections);
+}
+
+void listener_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t 
type)
+{
+  global_context_t *gc = global_context(h);
+  if (type == PN_REACTOR_QUIESCED)
+    gc->quiesce_count++;
+  else
+    gc->quiesce_count = 0;
+
+  switch (type) {
+  case PN_CONNECTION_INIT:
+    {
+      pn_connection_t *connection = pn_event_connection(event);
+
+      // New incoming connection on listener socket.  Give each a separate 
handler.
+      pn_handler_t *ch = connection_handler(gc);
+      pn_handshaker_t *handshaker = pn_handshaker();
+      pn_handler_add(ch, handshaker);
+      pn_decref(handshaker);
+      pn_record_t *record = pn_connection_attachments(connection);
+      pn_record_set_handler(record, ch);
+      pn_decref(ch);
+    }
+    break;
+  case PN_REACTOR_QUIESCED:
+    {
+      // Two quiesce in a row means we have been idle for a timout period
+      if (gc->opts->timeout != -1 && gc->quiesce_count > 1)
+        global_shutdown(gc);
+    }
+    break;
+  case PN_REACTOR_INIT:
+    {
+      pn_reactor_t *reactor = pn_event_reactor(event);
+      start_listener(gc, reactor);
+
+      // hack to let test scripts know when the receivers are ready (so
+      // that the senders may be started)
+      if (gc->opts->ready_text) {
+        fprintf(stdout, "%s\n", gc->opts->ready_text);
+        fflush(stdout);
+      }
+      if (gc->opts->timeout != -1)
+        pn_reactor_set_timeout(pn_event_reactor(event), gc->opts->timeout);
+    }
+    break;
+  case PN_REACTOR_FINAL:
+    {
+      if (gc->received == 0) statistics_start(gc->stats);
+      statistics_report(gc->stats, gc->sent, gc->received);
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_handler_t *listener_handler(Options_t *opts, Statistics_t *stats)
+{
+  pn_handler_t *h = pn_handler_new(listener_dispatch, 
sizeof(global_context_t), listener_cleanup);
+  global_context_t *gc = global_context(h);
+  global_context_init(gc, opts, stats);
+  gc->listener_handler = h;
+  return h;
+}
+
+static void parse_options( int argc, char **argv, Options_t *opts )
+{
+    int c;
+    opterr = 0;
+
+    memset( opts, 0, sizeof(*opts) );
+    opts->recv_count = -1;
+    opts->timeout = -1;
+    addresses_init( &opts->subscriptions);
+
+    while ((c = getopt(argc, argv,
+                       "a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) {
+        switch (c) {
+        case 'a':
+          {
+            // TODO: multiple addresses?
+            char *comma = strchr(optarg, ',');
+            check(comma == 0, "multiple addresses not implemented");
+            check(opts->subscriptions.count == 0, "multiple addresses not 
implemented");
+            addresses_merge( &opts->subscriptions, optarg );
+          }
+          break;
+        case 'c':
+            if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 't':
+            if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            if (opts->timeout > 0) opts->timeout *= 1000;
+            break;
+        case 'R': opts->reply = 1; break;
+        case 'V': enable_logging(); break;
+        case 'X': opts->ready_text = optarg; break;
+        default:
+            usage(1);
+        }
+    }
+
+    if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions,
+                                                       "amqp://~0.0.0.0" );
+}
+
+int main(int argc, char** argv)
+{
+  Options_t opts;
+  Statistics_t stats;
+  parse_options( argc, argv, &opts );
+  pn_reactor_t *reactor = pn_reactor();
+
+  // set up default handlers for our reactor
+  pn_handler_t *root = pn_reactor_get_handler(reactor);
+  pn_handler_t *lh = listener_handler(&opts, &stats);
+  pn_handler_add(root, lh);
+  pn_handshaker_t *handshaker = pn_handshaker();
+  pn_handler_add(root, handshaker);
+
+  // Omit decrefs else segfault.  Not sure why they are necessary
+  // to keep valgrind happy for the connection_handler, but not here.
+  // pn_decref(handshaker);
+  // pn_decref(lh);
+
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+
+  addresses_free( &opts.subscriptions );
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cc791045/tests/tools/apps/c/reactor-send.c
----------------------------------------------------------------------
diff --git a/tests/tools/apps/c/reactor-send.c 
b/tests/tools/apps/c/reactor-send.c
new file mode 100644
index 0000000..271efe3
--- /dev/null
+++ b/tests/tools/apps/c/reactor-send.c
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Implements a subset of msgr-send.c using reactor events.
+ */
+
+#include "proton/message.h"
+#include "proton/error.h"
+#include "proton/types.h"
+#include "proton/reactor.h"
+#include "proton/handlers.h"
+#include "proton/engine.h"
+#include "proton/url.h"
+#include "msgr-common.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+
+
+typedef struct {
+    Addresses_t targets;
+    uint64_t msg_count;
+    uint32_t msg_size;  // of body
+    uint32_t send_batch;
+    int   outgoing_window;
+    unsigned int report_interval;      // in seconds
+    //Addresses_t subscriptions;
+    //Addresses_t reply_tos;
+    int   get_replies;
+    int   timeout;      // in seconds
+    int   incoming_window;
+    int   recv_count;
+    const char *name;
+    char *certificate;
+    char *privatekey;   // used to sign certificate
+    char *password;     // for private key file
+    char *ca_db;        // trusted CA database
+} Options_t;
+
+
+static void usage(int rc)
+{
+    printf("Usage: reactor-send [OPTIONS] \n"
+           " -a <addr> \tThe target address [amqp[s]://domain[/name]]\n"
+           " -c # \tNumber of messages to send before exiting [0=forever]\n"
+           " -b # \tSize of message body in bytes [1024]\n"
+           " -R \tWait for a reply to each sent message\n"
+           " -V \tEnable debug logging\n"
+           );
+    exit(rc);
+}
+
+
+typedef struct {
+  Options_t *opts;
+  Statistics_t *stats;
+  uint64_t sent;
+  uint64_t received;
+  pn_message_t *message;
+  pn_message_t *reply_message;
+  pn_atom_t id;
+  char *encoded_data;
+  size_t encoded_data_size;
+  pn_url_t *send_url;
+  pn_string_t *hostname;
+  pn_string_t *container_id;
+} sender_context_t;
+
+void sender_context_init(sender_context_t *sc, Options_t *opts, Statistics_t 
*stats)
+{
+  sc->opts = opts;
+  sc->stats = stats;
+  sc->sent = 0;
+  sc->received = 0;
+  sc->id.type = PN_ULONG;
+  sc->reply_message = 0;
+  // 4096 extra bytes should easily cover the message metadata
+  sc->encoded_data_size = sc->opts->msg_size + 4096;
+  sc->encoded_data = (char *)calloc(1, sc->encoded_data_size);
+  check(sc->encoded_data, "failed to allocate encoding buffer");
+  sc->container_id = pn_string("reactor-send"); // prefer uuid-like name
+
+  sc->reply_message = (sc->opts->get_replies) ? pn_message() : 0;
+  sc->message = pn_message();
+  check(sc->message, "failed to allocate a message");
+  pn_string_t *rpto = pn_string("amqp://");
+  pn_string_addf(rpto, "%s", pn_string_get(sc->container_id));
+  pn_message_set_reply_to(sc->message, pn_string_get(rpto));
+  pn_free(rpto);
+  pn_data_t *body = pn_message_body(sc->message);
+  // borrow the encoding buffer this one time
+  char *data = sc->encoded_data;
+  pn_data_put_binary(body, pn_bytes(sc->opts->msg_size, data));
+
+  check(sc->opts->targets.count > 0, "no specified address");
+  sc->send_url = pn_url_parse(sc->opts->targets.addresses[0]);
+  const char *host = pn_url_get_host(sc->send_url);
+  const char *port = pn_url_get_port(sc->send_url);
+  sc->hostname = pn_string(host);
+  if (port && strlen(port))
+    pn_string_addf(sc->hostname, ":%s", port);
+}
+
+sender_context_t *sender_context(pn_handler_t *h)
+{
+  return (sender_context_t *) pn_handler_mem(h);
+}
+
+void sender_cleanup(pn_handler_t *h)
+{
+  sender_context_t *sc = sender_context(h);
+  pn_message_free(sc->message);
+  pn_message_free(sc->reply_message);
+  pn_url_free(sc->send_url);
+  pn_free(sc->hostname);
+  pn_free(sc->container_id);
+  free(sc->encoded_data);
+}
+
+pn_handler_t *replyto_handler(sender_context_t *sc);
+
+void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
+{
+  sender_context_t *sc = sender_context(h);
+
+  switch (type) {
+  case PN_CONNECTION_INIT:
+    {
+      pn_connection_t *conn = pn_event_connection(event);
+      pn_connection_set_container(conn, pn_string_get(sc->container_id));
+      pn_connection_set_hostname(conn, pn_string_get(sc->hostname));
+      pn_connection_open(conn);
+      pn_session_t *ssn = pn_session(conn);
+      pn_session_open(ssn);
+      pn_link_t *snd = pn_sender(ssn, "sender");
+      const char *path = pn_url_get_path(sc->send_url);
+      if (path && strlen(path)) {
+        pn_terminus_set_address(pn_link_target(snd), path);
+        pn_terminus_set_address(pn_link_source(snd), path);
+      }
+      pn_link_open(snd);
+    }
+    break;
+  case PN_LINK_FLOW:
+    {
+      pn_link_t *snd = pn_event_link(event);
+      while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) {
+        if (sc->sent == 0)
+          statistics_start(sc->stats);
+
+        char tag[8];
+        void *ptr = &tag;
+        *((uint64_t *) ptr) = sc->sent;
+        pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8));
+
+        // setup the message to send
+        pn_message_t *msg = sc->message;
+        pn_message_set_address(msg, sc->opts->targets.addresses[0]);
+        sc->id.u.as_ulong = sc->sent;
+        pn_message_set_correlation_id(msg, sc->id);
+        pn_message_set_creation_time(msg, msgr_now());
+
+        size_t size = sc->encoded_data_size;
+        int err = pn_message_encode(msg, sc->encoded_data, &size);
+        check(err == 0, "message encoding error");
+        pn_link_send(snd, sc->encoded_data, size);
+        pn_delivery_settle(dlv);
+        sc->sent++;
+      }
+      if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) {
+        pn_link_close(snd);
+        pn_connection_t *conn = pn_event_connection(event);
+        pn_connection_close(conn);
+      }
+    }
+    break;
+  case PN_LINK_INIT:
+    {
+      pn_link_t *link = pn_event_link(event);
+      if (pn_link_is_receiver(link)) {
+        // Response messages link.  Could manage credit and deliveries in this 
handler but
+        // a dedicated handler also works.
+        pn_handler_t *replyto = replyto_handler(sc);
+        pn_flowcontroller_t *fc = pn_flowcontroller(1024);
+        pn_handler_add(replyto, fc);
+        pn_decref(fc);
+        pn_handshaker_t *handshaker = pn_handshaker();
+        pn_handler_add(replyto, handshaker);
+        pn_decref(handshaker);
+        pn_record_t *record = pn_link_attachments(link);
+        pn_record_set_handler(record, replyto);
+        pn_decref(replyto);
+      }
+    }
+    break;
+  case PN_CONNECTION_LOCAL_CLOSE:
+    {
+      statistics_report(sc->stats, sc->sent, sc->received);
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_handler_t *sender_handler(Options_t *opts, Statistics_t *stats)
+{
+  pn_handler_t *h = pn_handler_new(sender_dispatch, sizeof(sender_context_t), 
sender_cleanup);
+  sender_context_t *sc = sender_context(h);
+  sender_context_init(sc, opts, stats);
+  return h;
+}
+
+sender_context_t *replyto_sender_context(pn_handler_t *h)
+{
+  sender_context_t **p = (sender_context_t **) pn_handler_mem(h);
+  return *p;
+}
+
+void replyto_cleanup(pn_handler_t *h)
+{}
+
+void replyto_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t 
type) {
+  sender_context_t *sc = replyto_sender_context(h);
+
+  switch (type) {
+  case PN_DELIVERY:
+    {
+      check(sc->opts->get_replies, "Unexpected reply message");
+      pn_link_t *recv_link = pn_event_link(event);
+      pn_delivery_t *dlv = pn_event_delivery(event);
+      if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) {
+        size_t encoded_size = pn_delivery_pending(dlv);
+        check(encoded_size <= sc->encoded_data_size, "decoding buffer too 
small");
+        ssize_t n = pn_link_recv(recv_link, sc->encoded_data, encoded_size);
+        check(n == (ssize_t)encoded_size, "read fail on reply link");
+        pn_message_t *msg = sc->reply_message;
+        int err = pn_message_decode(msg, sc->encoded_data, n);
+        check(err == 0, "message decode error");
+        statistics_msg_received(sc->stats, msg);
+        sc->received++;
+        pn_delivery_settle(dlv);
+      }
+      if (sc->received == sc->opts->msg_count) {
+        pn_link_close(recv_link);
+        pn_connection_t *conn = pn_event_connection(event);
+        pn_connection_close(conn);
+      }
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+pn_handler_t *replyto_handler(sender_context_t *sc)
+{
+  pn_handler_t *h = pn_handler_new(replyto_dispatch, sizeof(sender_context_t 
*), replyto_cleanup);
+  sender_context_t **p = (sender_context_t **) pn_handler_mem(h);
+  *p = sc;
+  return h;
+}
+
+static void parse_options( int argc, char **argv, Options_t *opts )
+{
+    int c;
+    opterr = 0;
+
+    memset( opts, 0, sizeof(*opts) );
+    opts->msg_size  = 1024;
+    opts->send_batch = 1024;
+    opts->timeout = -1;
+    opts->recv_count = -1;
+    addresses_init(&opts->targets);
+
+    while ((c = getopt(argc, argv,
+                       "a:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) {
+        switch(c) {
+        case 'a':
+          {
+            // TODO: multiple addresses?  To keep tests happy, accept multiple 
for now,
+            // but ignore all but the first.
+            addresses_merge( &opts->targets, optarg );
+          }
+          break;
+        case 'c':
+            if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'b':
+            if (sscanf( optarg, "%u", &opts->msg_size ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'p':
+            if (sscanf( optarg, "%u", &opts->send_batch ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'w':
+            if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'e':
+            if (sscanf( optarg, "%u", &opts->report_interval ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'R': opts->get_replies = 1; break;
+        case 't':
+            if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            if (opts->timeout > 0) opts->timeout *= 1000;
+            break;
+        case 'W':
+            if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'B':
+            if (sscanf( optarg, "%d", &opts->recv_count ) != 1) {
+                fprintf(stderr, "Option -%c requires an integer argument.\n", 
optopt);
+                usage(1);
+            }
+            break;
+        case 'V': enable_logging(); break;
+        case 'N': opts->name = optarg; break;
+        case 'T': opts->ca_db = optarg; break;
+        case 'C': opts->certificate = optarg; break;
+        case 'K': opts->privatekey = optarg; break;
+        case 'P': parse_password( optarg, &opts->password ); break;
+
+        default:
+            usage(1);
+        }
+    }
+
+    // default target if none specified
+    if (opts->targets.count == 0) addresses_add( &opts->targets, 
"amqp://0.0.0.0" );
+}
+
+
+int main(int argc, char** argv)
+{
+  Options_t opts;
+  Statistics_t stats;
+  parse_options( argc, argv, &opts );
+
+  pn_reactor_t *reactor = pn_reactor();
+  pn_handler_t *sh = sender_handler(&opts, &stats);
+  pn_handler_add(sh, pn_handshaker());
+  pn_reactor_connection(reactor, sh);
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+
+  pn_handler_free(sh);
+  addresses_free(&opts.targets);
+  return 0;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to