http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c 
b/proton-c/src/messenger/messenger.c
deleted file mode 100644
index 264a733..0000000
--- a/proton-c/src/messenger/messenger.c
+++ /dev/null
@@ -1,2438 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/messenger.h>
-
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/event.h>
-#include <proton/object.h>
-#include <proton/sasl.h>
-#include <proton/session.h>
-
-#include <assert.h>
-#include <ctype.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdio.h>
-
-#include "core/log_private.h"
-#include "core/util.h"
-#include "platform/platform.h" // pn_i_getpid, pn_i_now, pni_snprintf
-#include "platform/platform_fmt.h"
-#include "store.h"
-#include "subscription.h"
-#include "transform.h"
-
-#include "reactor/io.h"
-#include "reactor/selectable.h"
-#include "reactor/selector.h"
-
-typedef struct pn_link_ctx_t pn_link_ctx_t;
-
-typedef struct {
-  pn_string_t *text;
-  bool passive;
-  char *scheme;
-  char *user;
-  char *pass;
-  char *host;
-  char *port;
-  char *name;
-} pn_address_t;
-
-// algorithm for granting credit to receivers
-typedef enum {
-  // pn_messenger_recv( X ), where:
-  LINK_CREDIT_EXPLICIT, // X > 0
-  LINK_CREDIT_AUTO,     // X == -1
-  LINK_CREDIT_MANUAL    // X == -2
-} pn_link_credit_mode_t;
-
-struct pn_messenger_t {
-  pn_address_t address;
-  char *name;
-  char *certificate;
-  char *private_key;
-  char *password;
-  char *trusted_certificates;
-  pn_io_t *io;
-  pn_list_t *pending; // pending selectables
-  pn_selectable_t *interruptor;
-  pn_socket_t ctrl[2];
-  pn_list_t *listeners;
-  pn_list_t *connections;
-  pn_selector_t *selector;
-  pn_collector_t *collector;
-  pn_list_t *credited;
-  pn_list_t *blocked;
-  pn_timestamp_t next_drain;
-  uint64_t next_tag;
-  pni_store_t *outgoing;
-  pni_store_t *incoming;
-  pn_list_t *subscriptions;
-  pn_subscription_t *incoming_subscription;
-  pn_error_t *error;
-  pn_transform_t *routes;
-  pn_transform_t *rewrites;
-  pn_tracker_t outgoing_tracker;
-  pn_tracker_t incoming_tracker;
-  pn_string_t *original;
-  pn_string_t *rewritten;
-  pn_string_t *domain;
-  int timeout;
-  int send_threshold;
-  pn_link_credit_mode_t credit_mode;
-  int credit_batch;  // when LINK_CREDIT_AUTO
-  int credit;        // available
-  int distributed;   // credit
-  int receivers;     // # receiver links
-  int draining;      // # links in drain state
-  int connection_error;
-  int flags;
-  int snd_settle_mode;          /* pn_snd_settle_mode_t or -1 for unset */
-  pn_rcv_settle_mode_t rcv_settle_mode;
-  pn_tracer_t tracer;
-  pn_ssl_verify_mode_t ssl_peer_authentication_mode;
-  bool blocking;
-  bool passive;
-  bool interrupted;
-  bool worked;
-};
-
-#define CTX_HEAD                                \
-  pn_messenger_t *messenger;                    \
-  pn_selectable_t *selectable;                  \
-  bool pending;
-
-typedef struct pn_ctx_t {
-  CTX_HEAD
-} pn_ctx_t;
-
-typedef struct {
-  CTX_HEAD
-  char *host;
-  char *port;
-  pn_subscription_t *subscription;
-  pn_ssl_domain_t *domain;
-} pn_listener_ctx_t;
-
-typedef struct {
-  CTX_HEAD
-  pn_connection_t *connection;
-  char *address;
-  char *scheme;
-  char *user;
-  char *pass;
-  char *host;
-  char *port;
-  pn_listener_ctx_t *listener;
-} pn_connection_ctx_t;
-
-static pn_connection_ctx_t *pni_context(pn_selectable_t *sel)
-{
-  assert(sel);
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pni_selectable_get_context(sel);
-  assert(ctx);
-  return ctx;
-}
-
-static pn_transport_t *pni_transport(pn_selectable_t *sel)
-{
-  return pn_connection_transport(pni_context(sel)->connection);
-}
-
-static ssize_t pni_connection_capacity(pn_selectable_t *sel)
-{
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t capacity = pn_transport_capacity(transport);
-  if (capacity < 0) {
-    if (pn_transport_closed(transport)) {
-      pn_selectable_terminate(sel);
-    }
-  }
-  return capacity;
-}
-
-bool pn_messenger_flow(pn_messenger_t *messenger);
-
-static ssize_t pni_connection_pending(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *ctx = pni_context(sel);
-  pn_messenger_flow(ctx->messenger);
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t pending = pn_transport_pending(transport);
-  if (pending < 0) {
-    if (pn_transport_closed(transport)) {
-      pn_selectable_terminate(sel);
-    }
-  }
-  return pending;
-}
-
-static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *ctx = pni_context(sel);
-  return ctx->messenger->next_drain;
-}
-
-static void pni_connection_update(pn_selectable_t *sel) {
-  ssize_t c = pni_connection_capacity(sel);
-  pn_selectable_set_reading(sel, c > 0);
-  ssize_t p = pni_connection_pending(sel);
-  pn_selectable_set_writing(sel, p > 0);
-  pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
-  if (c < 0 && p < 0) {
-    pn_selectable_terminate(sel);
-  }
-}
-
-#include <errno.h>
-
-static void pn_error_report(const char *pfx, const char *error)
-{
-  pn_logf("%s ERROR %s", pfx, error);
-}
-
-void pni_modified(pn_ctx_t *ctx)
-{
-  pn_messenger_t *m = ctx->messenger;
-  pn_selectable_t *sel = ctx->selectable;
-  if (pn_selectable_is_registered(sel) && !ctx->pending) {
-    pn_list_add(m->pending, sel);
-    ctx->pending = true;
-  }
-}
-
-void pni_conn_modified(pn_connection_ctx_t *ctx)
-{
-  pni_connection_update(ctx->selectable);
-  pni_modified((pn_ctx_t *) ctx);
-}
-
-void pni_lnr_modified(pn_listener_ctx_t *lnr)
-{
-  pni_modified((pn_ctx_t *) lnr);
-}
-
-int pn_messenger_process_events(pn_messenger_t *messenger);
-
-static void pni_connection_error(pn_selectable_t *sel)
-{
-  pn_transport_t *transport = pni_transport(sel);
-  pn_transport_close_tail(transport);
-  pn_transport_close_head(transport);
-}
-
-static void pni_connection_readable(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *context = pni_context(sel);
-  pn_messenger_t *messenger = context->messenger;
-  pn_connection_t *connection = context->connection;
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t capacity = pn_transport_capacity(transport);
-  if (capacity > 0) {
-    ssize_t n = pn_recv(messenger->io, pn_selectable_get_fd(sel),
-                        pn_transport_tail(transport), capacity);
-    if (n <= 0) {
-      if (n == 0 || !pn_wouldblock(messenger->io)) {
-        if (n < 0) perror("recv");
-        pn_transport_close_tail(transport);
-        if (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
-          pn_error_report("CONNECTION", "connection aborted (remote)");
-        }
-      }
-    } else {
-      int err = pn_transport_process(transport, (size_t)n);
-      if (err)
-        pn_error_copy(messenger->error, pn_transport_error(transport));
-    }
-  }
-
-  pn_messenger_process_events(messenger);
-  pn_messenger_flow(messenger);
-  messenger->worked = true;
-  pni_conn_modified(context);
-}
-
-static void pni_connection_writable(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *context = pni_context(sel);
-  pn_messenger_t *messenger = context->messenger;
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t pending = pn_transport_pending(transport);
-  if (pending > 0) {
-    ssize_t n = pn_send(messenger->io, pn_selectable_get_fd(sel),
-                        pn_transport_head(transport), pending);
-    if (n < 0) {
-      if (!pn_wouldblock(messenger->io)) {
-        perror("send");
-        pn_transport_close_head(transport);
-      }
-    } else {
-      pn_transport_pop(transport, n);
-    }
-  }
-
-  pn_messenger_process_events(messenger);
-  pn_messenger_flow(messenger);
-  messenger->worked = true;
-  pni_conn_modified(context);
-}
-
-static void pni_connection_expired(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *ctx = pni_context(sel);
-  pn_messenger_flow(ctx->messenger);
-  ctx->messenger->worked = true;
-  pni_conn_modified(ctx);
-}
-
-static void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t 
*conn);
-
-static void pni_connection_finalize(pn_selectable_t *sel)
-{
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pni_selectable_get_context(sel);
-  pn_socket_t fd = pn_selectable_get_fd(sel);
-  pn_close(ctx->messenger->io, fd);
-  pn_list_remove(ctx->messenger->pending, sel);
-  pni_messenger_reclaim(ctx->messenger, ctx->connection);
-}
-
-pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
-                                         pn_socket_t sock,
-                                         const char *scheme,
-                                         char *user,
-                                         char *pass,
-                                         char *host,
-                                         char *port,
-                                         pn_listener_ctx_t *lnr);
-
-static void pni_listener_readable(pn_selectable_t *sel)
-{
-  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) 
pni_selectable_get_context(sel);
-  pn_subscription_t *sub = ctx->subscription;
-  const char *scheme = pn_subscription_scheme(sub);
-  char name[1024];
-  pn_socket_t sock = pn_accept(ctx->messenger->io, pn_selectable_get_fd(sel), 
name, 1024);
-
-  pn_transport_t *t = pn_transport();
-  pn_transport_set_server(t);
-  if (ctx->messenger->flags & PN_FLAGS_ALLOW_INSECURE_MECHS) {
-      pn_sasl_t *s = pn_sasl(t);
-      pn_sasl_set_allow_insecure_mechs(s, true);
-  }
-  pn_ssl_t *ssl = pn_ssl(t);
-  pn_ssl_init(ssl, ctx->domain, NULL);
-
-  pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, 
scheme, NULL, NULL, NULL, NULL, ctx);
-  pn_transport_bind(t, conn);
-  pn_decref(t);
-  pni_conn_modified((pn_connection_ctx_t *) pn_connection_get_context(conn));
-}
-
-static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t 
*ctx);
-
-static void pni_listener_finalize(pn_selectable_t *sel)
-{
-  pn_listener_ctx_t *lnr = (pn_listener_ctx_t *) 
pni_selectable_get_context(sel);
-  pn_messenger_t *messenger = lnr->messenger;
-  pn_close(messenger->io, pn_selectable_get_fd(sel));
-  pn_list_remove(messenger->pending, sel);
-  pn_listener_ctx_free(messenger, lnr);
-}
-
-static bool pn_streq(const char *a, const char *b)
-{
-  return a == b || (a && b && !strcmp(a, b));
-}
-
-static const char *default_port(const char *scheme)
-{
-  if (scheme && pn_streq(scheme, "amqps"))
-    return "5671";
-  else
-    return "5672";
-}
-
-static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger,
-                                          const char *scheme,
-                                          const char *host,
-                                          const char *port)
-{
-  pn_socket_t socket = pn_listen(messenger->io, host, port ? port : 
default_port(scheme));
-  if (socket == PN_INVALID_SOCKET) {
-    pn_error_copy(messenger->error, pn_io_error(messenger->io));
-    pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n",
-                    messenger->address.host, messenger->address.port,
-                    pn_error_text(messenger->error));
-
-    return NULL;
-  }
-
-  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_class_new(PN_OBJECT, 
sizeof(pn_listener_ctx_t));
-  ctx->messenger = messenger;
-  ctx->domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
-  if (messenger->certificate) {
-    int err = pn_ssl_domain_set_credentials(ctx->domain, 
messenger->certificate,
-                                            messenger->private_key,
-                                            messenger->password);
-
-    if (err) {
-      pn_error_format(messenger->error, PN_ERR, "invalid credentials");
-      pn_ssl_domain_free(ctx->domain);
-      pn_free(ctx);
-      pn_close(messenger->io, socket);
-      return NULL;
-    }
-  }
-
-  if (!(scheme && !strcmp(scheme, "amqps"))) {
-    pn_ssl_domain_allow_unsecured_client(ctx->domain);
-  }
-
-  pn_subscription_t *sub = pn_subscription(messenger, scheme, host, port);
-  ctx->subscription = sub;
-  ctx->host = pn_strdup(host);
-  ctx->port = pn_strdup(port);
-
-  pn_selectable_t *selectable = pn_selectable();
-  pn_selectable_set_reading(selectable, true);
-  pn_selectable_on_readable(selectable, pni_listener_readable);
-  pn_selectable_on_release(selectable, pn_selectable_free);
-  pn_selectable_on_finalize(selectable, pni_listener_finalize);
-  pn_selectable_set_fd(selectable, socket);
-  pni_selectable_set_context(selectable, ctx);
-  pn_list_add(messenger->pending, selectable);
-  ctx->selectable = selectable;
-  ctx->pending = true;
-
-  pn_list_add(messenger->listeners, ctx);
-  return ctx;
-}
-
-static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t 
*ctx)
-{
-  pn_list_remove(messenger->listeners, ctx);
-  // XXX: subscriptions are freed when the messenger is freed 
pn_subscription_free(ctx->subscription);
-  free(ctx->host);
-  free(ctx->port);
-  pn_ssl_domain_free(ctx->domain);
-  pn_free(ctx);
-}
-
-static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
-                                              pn_connection_t *conn,
-                                              pn_socket_t sock,
-                                              const char *scheme,
-                                              const char *user,
-                                              const char *pass,
-                                              const char *host,
-                                              const char *port,
-                                              pn_listener_ctx_t *lnr)
-{
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(conn);
-  assert(!ctx);
-  ctx = (pn_connection_ctx_t *) malloc(sizeof(pn_connection_ctx_t));
-  ctx->messenger = messenger;
-  ctx->connection = conn;
-  pn_selectable_t *sel = pn_selectable();
-  ctx->selectable = sel;
-  pn_selectable_on_error(sel, pni_connection_error);
-  pn_selectable_on_readable(sel, pni_connection_readable);
-  pn_selectable_on_writable(sel, pni_connection_writable);
-  pn_selectable_on_expired(sel, pni_connection_expired);
-  pn_selectable_on_release(sel, pn_selectable_free);
-  pn_selectable_on_finalize(sel, pni_connection_finalize);
-  pn_selectable_set_fd(ctx->selectable, sock);
-  pni_selectable_set_context(ctx->selectable, ctx);
-  pn_list_add(messenger->pending, ctx->selectable);
-  ctx->pending = true;
-  ctx->scheme = pn_strdup(scheme);
-  ctx->user = pn_strdup(user);
-  ctx->pass = pn_strdup(pass);
-  ctx->host = pn_strdup(host);
-  ctx->port = pn_strdup(port);
-  ctx->listener = lnr;
-  pn_connection_set_context(conn, ctx);
-  return ctx;
-}
-
-static void pn_connection_ctx_free(pn_connection_t *conn)
-{
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(conn);
-  if (ctx) {
-    pni_selectable_set_context(ctx->selectable, NULL);
-    free(ctx->scheme);
-    free(ctx->user);
-    free(ctx->pass);
-    free(ctx->host);
-    free(ctx->port);
-    free(ctx);
-    pn_connection_set_context(conn, NULL);
-  }
-}
-
-#define OUTGOING (0x0000000000000000)
-#define INCOMING (0x1000000000000000)
-
-#define pn_tracker(direction, sequence) ((direction) | (sequence))
-#define pn_tracker_direction(tracker) ((tracker) & (0x1000000000000000))
-#define pn_tracker_sequence(tracker) ((pn_sequence_t) ((tracker) & 
(0x00000000FFFFFFFF)))
-
-
-static char *build_name(const char *name)
-{
-  static bool seeded = false;
-  // UUID standard format: 8-4-4-4-12 (36 chars, 32 alphanumeric and 4 hypens)
-  static const char *uuid_fmt = 
"%02X%02X%02X%02X-%02X%02X-%02X%02X-%02X%02X-%02X%02X%02X%02X%02X%02X";
-
-  int count = 0;
-  char *generated;
-  uint8_t bytes[16];
-  unsigned int r = 0;
-
-  if (name) {
-    return pn_strdup(name);
-  }
-
-  if (!seeded) {
-    int pid = pn_i_getpid();
-    int nowish = (int)pn_i_now();
-    // the lower bits of time are the most random, shift pid to push some
-    // randomness into the higher order bits
-    srand(nowish ^ (pid<<16));
-    seeded = true;
-  }
-
-  while (count < 16) {
-    if (!r) {
-      r =  (unsigned int) rand();
-    }
-
-    bytes[count] = r & 0xFF;
-    r >>= 8;
-    count++;
-  }
-
-  // From RFC4122, the version bits are set to 0100
-  bytes[6] = (bytes[6] & 0x0F) | 0x40;
-
-  // From RFC4122, the top two bits of byte 8 get set to 01
-  bytes[8] = (bytes[8] & 0x3F) | 0x80;
-
-  generated = (char *) malloc(37*sizeof(char));
-  sprintf(generated, uuid_fmt,
-         bytes[0], bytes[1], bytes[2], bytes[3],
-         bytes[4], bytes[5], bytes[6], bytes[7],
-         bytes[8], bytes[9], bytes[10], bytes[11],
-         bytes[12], bytes[13], bytes[14], bytes[15]);
-  return generated;
-}
-
-struct pn_link_ctx_t {
-  pn_subscription_t *subscription;
-};
-
-// compute the maximum amount of credit each receiving link is
-// entitled to.  The actual credit given to the link depends on what
-// amount of credit is actually available.
-static int per_link_credit( pn_messenger_t *messenger )
-{
-  if (messenger->receivers == 0) return 0;
-  int total = messenger->credit + messenger->distributed;
-  return pn_max(total/messenger->receivers, 1);
-}
-
-static void link_ctx_setup( pn_messenger_t *messenger,
-                            pn_connection_t *connection,
-                            pn_link_t *link )
-{
-  if (pn_link_is_receiver(link)) {
-    messenger->receivers++;
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *) calloc(1, sizeof(pn_link_ctx_t));
-    assert( ctx );
-    assert( !pn_link_get_context(link) );
-    pn_link_set_context( link, ctx );
-    pn_list_add(messenger->blocked, link);
-  }
-}
-
-static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
-{
-  if (pn_link_is_receiver(link)) {
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( link );
-    if (!ctx) return;
-    assert( messenger->receivers > 0 );
-    messenger->receivers--;
-    if (pn_link_get_drain(link)) {
-      pn_link_set_drain(link, false);
-      assert( messenger->draining > 0 );
-      messenger->draining--;
-    }
-    pn_list_remove(messenger->credited, link);
-    pn_list_remove(messenger->blocked, link);
-    pn_link_set_context( link, NULL );
-    free( ctx );
-  }
-}
-
-static void pni_interruptor_readable(pn_selectable_t *sel)
-{
-  pn_messenger_t *messenger = (pn_messenger_t *) 
pni_selectable_get_context(sel);
-  char buf[1024];
-  pn_read(messenger->io, pn_selectable_get_fd(sel), buf, 1024);
-  messenger->interrupted = true;
-}
-
-static void pni_interruptor_finalize(pn_selectable_t *sel)
-{
-  pn_messenger_t *messenger = (pn_messenger_t *) 
pni_selectable_get_context(sel);
-  messenger->interruptor = NULL;
-}
-
-pn_messenger_t *pn_messenger(const char *name)
-{
-  pn_messenger_t *m = (pn_messenger_t *) malloc(sizeof(pn_messenger_t));
-
-  if (m) {
-    m->name = build_name(name);
-    m->certificate = NULL;
-    m->private_key = NULL;
-    m->password = NULL;
-    m->trusted_certificates = NULL;
-    m->timeout = -1;
-    m->blocking = true;
-    m->passive = false;
-    m->io = pn_io();
-    m->pending = pn_list(PN_WEAKREF, 0);
-    m->interruptor = pn_selectable();
-    pn_selectable_set_reading(m->interruptor, true);
-    pn_selectable_on_readable(m->interruptor, pni_interruptor_readable);
-    pn_selectable_on_release(m->interruptor, pn_selectable_free);
-    pn_selectable_on_finalize(m->interruptor, pni_interruptor_finalize);
-    pn_list_add(m->pending, m->interruptor);
-    m->interrupted = false;
-    // Explicitly initialise pipe file descriptors to invalid values in case 
pipe
-    // fails, if we don't do this m->ctrl[0] could default to 0 - which is 
stdin.
-    m->ctrl[0] = -1;
-    m->ctrl[1] = -1;
-    pn_pipe(m->io, m->ctrl);
-    pn_selectable_set_fd(m->interruptor, m->ctrl[0]);
-    pni_selectable_set_context(m->interruptor, m);
-    m->listeners = pn_list(PN_WEAKREF, 0);
-    m->connections = pn_list(PN_WEAKREF, 0);
-    m->selector = pn_io_selector(m->io);
-    m->collector = pn_collector();
-    m->credit_mode = LINK_CREDIT_EXPLICIT;
-    m->credit_batch = 1024;
-    m->credit = 0;
-    m->distributed = 0;
-    m->receivers = 0;
-    m->draining = 0;
-    m->credited = pn_list(PN_WEAKREF, 0);
-    m->blocked = pn_list(PN_WEAKREF, 0);
-    m->next_drain = 0;
-    m->next_tag = 0;
-    m->outgoing = pni_store();
-    m->incoming = pni_store();
-    m->subscriptions = pn_list(PN_OBJECT, 0);
-    m->incoming_subscription = NULL;
-    m->error = pn_error();
-    m->routes = pn_transform();
-    m->rewrites = pn_transform();
-    m->outgoing_tracker = 0;
-    m->incoming_tracker = 0;
-    m->address.text = pn_string(NULL);
-    m->original = pn_string(NULL);
-    m->rewritten = pn_string(NULL);
-    m->domain = pn_string(NULL);
-    m->connection_error = 0;
-    m->flags = PN_FLAGS_ALLOW_INSECURE_MECHS; // TODO: Change this back to 0 
for the Proton 0.11 release
-    m->snd_settle_mode = -1;    /* Default depends on sender/receiver */
-    m->rcv_settle_mode = PN_RCV_FIRST;
-    m->tracer = NULL;
-    m->ssl_peer_authentication_mode = PN_SSL_VERIFY_PEER_NAME;
-  }
-
-  return m;
-}
-
-int pni_messenger_add_subscription(pn_messenger_t *messenger, 
pn_subscription_t *subscription)
-{
-  return pn_list_add(messenger->subscriptions, subscription);
-}
-
-
-const char *pn_messenger_name(pn_messenger_t *messenger)
-{
-  return messenger->name;
-}
-
-int pn_messenger_set_certificate(pn_messenger_t *messenger, const char 
*certificate)
-{
-  if (messenger->certificate) free(messenger->certificate);
-  messenger->certificate = pn_strdup(certificate);
-  return 0;
-}
-
-const char *pn_messenger_get_certificate(pn_messenger_t *messenger)
-{
-  return messenger->certificate;
-}
-
-int pn_messenger_set_private_key(pn_messenger_t *messenger, const char 
*private_key)
-{
-  if (messenger->private_key) free(messenger->private_key);
-  messenger->private_key = pn_strdup(private_key);
-  return 0;
-}
-
-const char *pn_messenger_get_private_key(pn_messenger_t *messenger)
-{
-  return messenger->private_key;
-}
-
-int pn_messenger_set_password(pn_messenger_t *messenger, const char *password)
-{
-  if (messenger->password) free(messenger->password);
-  messenger->password = pn_strdup(password);
-  return 0;
-}
-
-const char *pn_messenger_get_password(pn_messenger_t *messenger)
-{
-  return messenger->password;
-}
-
-int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const 
char *trusted_certificates)
-{
-  if (messenger->trusted_certificates) free(messenger->trusted_certificates);
-  messenger->trusted_certificates = pn_strdup(trusted_certificates);
-  return 0;
-}
-
-const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger)
-{
-  return messenger->trusted_certificates;
-}
-
-int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
-{
-  if (!messenger) return PN_ARG_ERR;
-  messenger->timeout = timeout;
-  return 0;
-}
-
-int pn_messenger_get_timeout(pn_messenger_t *messenger)
-{
-  return messenger ? messenger->timeout : 0;
-}
-
-bool pn_messenger_is_blocking(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->blocking;
-}
-
-int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking)
-{
-  messenger->blocking = blocking;
-  return 0;
-}
-
-bool pn_messenger_is_passive(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->passive;
-}
-
-int pn_messenger_set_passive(pn_messenger_t *messenger, bool passive)
-{
-  messenger->passive = passive;
-  return 0;
-}
-
-pn_selectable_t *pn_messenger_selectable(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  pn_messenger_process_events(messenger);
-  pn_list_t *p = messenger->pending;
-  size_t n = pn_list_size(p);
-  if (n) {
-    pn_selectable_t *s = (pn_selectable_t *) pn_list_get(p, n - 1);
-    pn_list_del(p, n-1, 1);
-    // this is a total hack, messenger has selectables whose context
-    // are the messenger itself and whose context share a common
-    // prefix that is described by pn_ctx_t
-    void *c = pni_selectable_get_context(s);
-    if (c != messenger) {
-      pn_ctx_t *ctx = (pn_ctx_t *) c;
-      ctx->pending = false;
-    }
-    return s;
-  } else {
-    return NULL;
-  }
-}
-
-static void pni_reclaim(pn_messenger_t *messenger)
-{
-  while (pn_list_size(messenger->listeners)) {
-    pn_listener_ctx_t *l = (pn_listener_ctx_t *) 
pn_list_get(messenger->listeners, 0);
-    pn_listener_ctx_free(messenger, l);
-  }
-
-  while (pn_list_size(messenger->connections)) {
-    pn_connection_t *c = (pn_connection_t *) 
pn_list_get(messenger->connections, 0);
-    pni_messenger_reclaim(messenger, c);
-  }
-}
-
-void pn_messenger_free(pn_messenger_t *messenger)
-{
-  if (messenger) {
-    pn_free(messenger->domain);
-    pn_free(messenger->rewritten);
-    pn_free(messenger->original);
-    pn_free(messenger->address.text);
-    free(messenger->name);
-    free(messenger->certificate);
-    free(messenger->private_key);
-    free(messenger->password);
-    free(messenger->trusted_certificates);
-    pni_reclaim(messenger);
-    pn_free(messenger->pending);
-    pn_selectable_free(messenger->interruptor);
-    pn_close(messenger->io, messenger->ctrl[0]);
-    pn_close(messenger->io, messenger->ctrl[1]);
-    pn_free(messenger->listeners);
-    pn_free(messenger->connections);
-    pn_selector_free(messenger->selector);
-    pn_collector_free(messenger->collector);
-    pn_error_free(messenger->error);
-    pni_store_free(messenger->incoming);
-    pni_store_free(messenger->outgoing);
-    pn_free(messenger->subscriptions);
-    pn_free(messenger->rewrites);
-    pn_free(messenger->routes);
-    pn_free(messenger->credited);
-    pn_free(messenger->blocked);
-    pn_free(messenger->io);
-    free(messenger);
-  }
-}
-
-int pn_messenger_errno(pn_messenger_t *messenger)
-{
-  if (messenger) {
-    return pn_error_code(messenger->error);
-  } else {
-    return PN_ARG_ERR;
-  }
-}
-
-pn_error_t *pn_messenger_error(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->error;
-}
-
-// Run the credit scheduler, grant flow as needed.  Return True if
-// credit allocation for any link has changed.
-bool pn_messenger_flow(pn_messenger_t *messenger)
-{
-  bool updated = false;
-  if (messenger->receivers == 0) {
-    messenger->next_drain = 0;
-    return updated;
-  }
-
-  if (messenger->credit_mode == LINK_CREDIT_AUTO) {
-    // replenish, but limit the max total messages buffered
-    const int max = messenger->receivers * messenger->credit_batch;
-    const int used = messenger->distributed + pn_messenger_incoming(messenger);
-    if (max > used)
-      messenger->credit = max - used;
-  } else if (messenger->credit_mode == LINK_CREDIT_MANUAL) {
-    messenger->next_drain = 0;
-    return false;
-  }
-
-  const int batch = per_link_credit(messenger);
-  while (messenger->credit > 0 && pn_list_size(messenger->blocked)) {
-    pn_link_t *link = (pn_link_t *) pn_list_get(messenger->blocked, 0);
-    pn_list_del(messenger->blocked, 0, 1);
-
-    const int more = pn_min( messenger->credit, batch );
-    messenger->distributed += more;
-    messenger->credit -= more;
-    pn_link_flow(link, more);
-    pn_list_add(messenger->credited, link);
-    updated = true;
-  }
-
-  if (!pn_list_size(messenger->blocked)) {
-    messenger->next_drain = 0;
-  } else {
-    // not enough credit for all links
-    if (!messenger->draining) {
-      pn_logf("%s: let's drain", messenger->name);
-      if (messenger->next_drain == 0) {
-        messenger->next_drain = pn_i_now() + 250;
-        pn_logf("%s: initializing next_drain", messenger->name);
-      } else if (messenger->next_drain <= pn_i_now()) {
-        // initiate drain, free up at most enough to satisfy blocked
-        messenger->next_drain = 0;
-        int needed = pn_list_size(messenger->blocked) * batch;
-        for (size_t i = 0; i < pn_list_size(messenger->credited); i++) {
-          pn_link_t *link = (pn_link_t *) pn_list_get(messenger->credited, i);
-          if (!pn_link_get_drain(link)) {
-            pn_link_set_drain(link, true);
-            needed -= pn_link_remote_credit(link);
-            messenger->draining++;
-            updated = true;
-          }
-
-          if (needed <= 0) {
-            break;
-          }
-        }
-      } else {
-        pn_logf("%s: delaying", messenger->name);
-      }
-    }
-  }
-  return updated;
-}
-
-static int pn_transport_config(pn_messenger_t *messenger,
-                               pn_connection_t *connection)
-{
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(connection);
-  pn_transport_t *transport = pn_connection_transport(connection);
-  if (messenger->tracer)
-    pn_transport_set_tracer(transport, messenger->tracer);
-  if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
-    pn_ssl_domain_t *d = pn_ssl_domain(PN_SSL_MODE_CLIENT);
-    if (messenger->certificate) {
-      int err = pn_ssl_domain_set_credentials( d, messenger->certificate,
-                                               messenger->private_key,
-                                               messenger->password);
-      if (err) {
-        pn_ssl_domain_free(d);
-        pn_error_report("CONNECTION", "invalid credentials");
-        return err;
-      }
-    }
-    if (messenger->trusted_certificates) {
-      int err = pn_ssl_domain_set_trusted_ca_db(d, 
messenger->trusted_certificates);
-      if (err) {
-        pn_ssl_domain_free(d);
-        pn_error_report("CONNECTION", "invalid certificate db");
-        return err;
-      }
-      err = pn_ssl_domain_set_peer_authentication(
-          d, messenger->ssl_peer_authentication_mode, NULL);
-      if (err) {
-        pn_ssl_domain_free(d);
-        pn_error_report("CONNECTION", "error configuring ssl to verify peer");
-      }
-    } else {
-      int err = pn_ssl_domain_set_peer_authentication(d, 
PN_SSL_ANONYMOUS_PEER, NULL);
-      if (err) {
-        pn_ssl_domain_free(d);
-        pn_error_report("CONNECTION", "error configuring ssl for anonymous 
peer");
-        return err;
-      }
-    }
-    pn_ssl_t *ssl = pn_ssl(transport);
-    pn_ssl_init(ssl, d, NULL);
-    pn_ssl_domain_free( d );
-  }
-
-  return 0;
-}
-
-static void pn_condition_report(const char *pfx, pn_condition_t *condition)
-{
-  if (pn_condition_is_redirect(condition)) {
-    pn_logf("%s NOTICE (%s) redirecting to %s:%i",
-            pfx,
-            pn_condition_get_name(condition),
-            pn_condition_redirect_host(condition),
-            pn_condition_redirect_port(condition));
-  } else if (pn_condition_is_set(condition)) {
-    char error[1024];
-    pni_snprintf(error, 1024, "(%s) %s",
-             pn_condition_get_name(condition),
-             pn_condition_get_description(condition));
-    pn_error_report(pfx, error);
-  }
-}
-
-int pni_pump_in(pn_messenger_t *messenger, const char *address, pn_link_t 
*receiver)
-{
-  pn_delivery_t *d = pn_link_current(receiver);
-  if (!pn_delivery_readable(d) || pn_delivery_partial(d)) {
-    return 0;
-  }
-
-  pni_entry_t *entry = pni_store_put(messenger->incoming, address);
-  pn_buffer_t *buf = pni_entry_bytes(entry);
-  pni_entry_set_delivery(entry, d);
-
-  pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( receiver );
-  pni_entry_set_context(entry, ctx ? ctx->subscription : NULL);
-
-  size_t pending = pn_delivery_pending(d);
-  int err = pn_buffer_ensure(buf, pending + 1);
-  if (err) return pn_error_format(messenger->error, err, "get: error growing 
buffer");
-  char *encoded = pn_buffer_memory(buf).start;
-  ssize_t n = pn_link_recv(receiver, encoded, pending);
-  if (n != (ssize_t) pending) {
-    return pn_error_format(messenger->error, n,
-                           "didn't receive pending bytes: %" PN_ZI " %" PN_ZI,
-                           n, pending);
-  }
-  n = pn_link_recv(receiver, encoded + pending, 1);
-  pn_link_advance(receiver);
-
-  pn_link_t *link = receiver;
-
-  if (messenger->credit_mode != LINK_CREDIT_MANUAL) {
-    // account for the used credit
-    assert(ctx);
-    assert(messenger->distributed);
-    messenger->distributed--;
-
-    // replenish if low (< 20% maximum batch) and credit available
-    if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 &&
-        messenger->credit > 0) {
-      const int max = per_link_credit(messenger);
-      const int lo_thresh = (int)(max * 0.2 + 0.5);
-      if (pn_link_remote_credit(link) < lo_thresh) {
-        const int more =
-            pn_min(messenger->credit, max - pn_link_remote_credit(link));
-        messenger->credit -= more;
-        messenger->distributed += more;
-        pn_link_flow(link, more);
-      }
-    }
-    // check if blocked
-    if (pn_list_index(messenger->blocked, link) < 0 &&
-        pn_link_remote_credit(link) == 0) {
-      pn_list_remove(messenger->credited, link);
-      if (pn_link_get_drain(link)) {
-        pn_link_set_drain(link, false);
-        assert(messenger->draining > 0);
-        messenger->draining--;
-      }
-      pn_list_add(messenger->blocked, link);
-    }
-  }
-
-  if (n != PN_EOS) {
-    return pn_error_format(messenger->error, n, "PN_EOS expected");
-  }
-  pn_buffer_append(buf, encoded, pending); // XXX
-
-  return 0;
-}
-
-void pni_messenger_reclaim_link(pn_messenger_t *messenger, pn_link_t *link)
-{
-  if (pn_link_is_receiver(link) && pn_link_credit(link) > 0) {
-    int credit = pn_link_credit(link);
-    messenger->credit += credit;
-    messenger->distributed -= credit;
-  }
-
-  pn_delivery_t *d = pn_unsettled_head(link);
-  while (d) {
-    pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
-    if (e) {
-      pni_entry_set_delivery(e, NULL);
-      if (pn_delivery_buffered(d)) {
-        pni_entry_set_status(e, PN_STATUS_ABORTED);
-      }
-    }
-    d = pn_unsettled_next(d);
-  }
-
-  link_ctx_release(messenger, link);
-}
-
-void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
-{
-  if (!conn) return;
-
-  pn_link_t *link = pn_link_head(conn, 0);
-  while (link) {
-    pni_messenger_reclaim_link(messenger, link);
-    link = pn_link_next(link, 0);
-  }
-
-  pn_list_remove(messenger->connections, conn);
-  pn_connection_ctx_free(conn);
-  pn_transport_free(pn_connection_transport(conn));
-  pn_connection_free(conn);
-}
-
-pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
-                                         pn_socket_t sock,
-                                         const char *scheme,
-                                         char *user,
-                                         char *pass,
-                                         char *host,
-                                         char *port,
-                                         pn_listener_ctx_t *lnr)
-{
-  pn_connection_t *connection = pn_connection();
-  if (!connection) return NULL;
-  pn_connection_collect(connection, messenger->collector);
-  pn_connection_ctx(messenger, connection, sock, scheme, user, pass, host, 
port, lnr);
-
-  pn_connection_set_container(connection, messenger->name);
-  pn_connection_set_hostname(connection, host);
-  pn_connection_set_user(connection, user);
-  pn_connection_set_password(connection, pass);
-
-  pn_list_add(messenger->connections, connection);
-
-  return connection;
-}
-
-void pn_messenger_process_connection(pn_messenger_t *messenger, pn_event_t 
*event)
-{
-  pn_connection_t *conn = pn_event_connection(event);
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(conn);
-
-  if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
-    pn_connection_open(conn);
-  }
-
-  if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
-    pn_condition_t *condition = pn_connection_remote_condition(conn);
-    pn_condition_report("CONNECTION", condition);
-    pn_connection_close(conn);
-    if (pn_condition_is_redirect(condition)) {
-      const char *host = pn_condition_redirect_host(condition);
-      char buf[1024];
-      sprintf(buf, "%i", pn_condition_redirect_port(condition));
-
-      pn_close(messenger->io, pn_selectable_get_fd(ctx->selectable));
-      pn_socket_t sock = pn_connect(messenger->io, host, buf);
-      pn_selectable_set_fd(ctx->selectable, sock);
-      pn_transport_unbind(pn_connection_transport(conn));
-      pn_connection_reset(conn);
-      pn_transport_t *t = pn_transport();
-      if (messenger->flags & PN_FLAGS_ALLOW_INSECURE_MECHS &&
-          messenger->address.user && messenger->address.pass) {
-        pn_sasl_t *s = pn_sasl(t);
-        pn_sasl_set_allow_insecure_mechs(s, true);
-      }
-      pn_transport_bind(t, conn);
-      pn_decref(t);
-      pn_transport_config(messenger, conn);
-    }
-  }
-}
-
-void pn_messenger_process_session(pn_messenger_t *messenger, pn_event_t *event)
-{
-  pn_session_t *ssn = pn_event_session(event);
-
-  if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
-    pn_session_open(ssn);
-  }
-
-  if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
-    pn_session_close(ssn);
-  }
-}
-
-void pn_messenger_process_link(pn_messenger_t *messenger, pn_event_t *event)
-{
-  pn_link_t *link = pn_event_link(event);
-  pn_connection_t *conn = pn_event_connection(event);
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(conn);
-
-  if (pn_link_state(link) & PN_LOCAL_UNINIT) {
-    pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
-    pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
-    link_ctx_setup( messenger, conn, link );
-    pn_link_open(link);
-    if (pn_link_is_receiver(link)) {
-      pn_listener_ctx_t *lnr = ctx->listener;
-      ((pn_link_ctx_t *)pn_link_get_context(link))->subscription = lnr ? 
lnr->subscription : NULL;
-    }
-  }
-
-  if (pn_link_state(link) & PN_REMOTE_ACTIVE) {
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context(link);
-    if (ctx) {
-      const char *addr = pn_terminus_get_address(pn_link_remote_source(link));
-      if (ctx->subscription) {
-        pni_subscription_set_address(ctx->subscription, addr);
-      }
-    }
-  }
-
-  if (pn_link_state(link) & PN_REMOTE_CLOSED) {
-    if (PN_LOCAL_ACTIVE & pn_link_state(link)) {
-      pn_condition_report("LINK", pn_link_remote_condition(link));
-      pn_link_close(link);
-      pni_messenger_reclaim_link(messenger, link);
-      pn_link_free(link);
-    }
-  }
-}
-
-int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t 
*sender);
-
-void pn_messenger_process_flow(pn_messenger_t *messenger, pn_event_t *event)
-{
-  pn_link_t *link = pn_event_link(event);
-
-  if (pn_link_is_sender(link)) {
-    pni_pump_out(messenger, pn_terminus_get_address(pn_link_target(link)), 
link);
-  } else {
-    // account for any credit left over after draining links has completed
-    if (pn_link_get_drain(link)) {
-      if (!pn_link_draining(link)) {
-        // drain completed!
-        int drained = pn_link_drained(link);
-        messenger->distributed -= drained;
-        messenger->credit += drained;
-        pn_link_set_drain(link, false);
-        messenger->draining--;
-        pn_list_remove(messenger->credited, link);
-        pn_list_add(messenger->blocked, link);
-      }
-    }
-  }
-}
-
-void pn_messenger_process_delivery(pn_messenger_t *messenger, pn_event_t 
*event)
-{
-  pn_delivery_t *d = pn_event_delivery(event);
-  pn_link_t *link = pn_event_link(event);
-  if (pn_delivery_updated(d)) {
-    if (pn_link_is_sender(link)) {
-      pn_delivery_update(d, pn_delivery_remote_state(d));
-    }
-    pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
-    if (e) pni_entry_updated(e);
-  }
-  pn_delivery_clear(d);
-  if (pn_delivery_readable(d)) {
-    int err = pni_pump_in(messenger, 
pn_terminus_get_address(pn_link_source(link)), link);
-    if (err) {
-      pn_logf("%s", pn_error_text(messenger->error));
-    }
-  }
-}
-
-void pn_messenger_process_transport(pn_messenger_t *messenger, pn_event_t 
*event)
-{
-  pn_connection_t *conn = pn_event_connection(event);
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(conn);
-  if (ctx) {
-    pni_conn_modified(ctx);
-  }
-}
-
-int pn_messenger_process_events(pn_messenger_t *messenger)
-{
-  int processed = 0;
-  pn_event_t *event;
-  while ((event = pn_collector_peek(messenger->collector))) {
-    processed++;
-    switch (pn_event_type(event)) {
-    case PN_CONNECTION_INIT:
-      pn_logf("connection created: %p", (void *) pn_event_connection(event));
-      break;
-    case PN_SESSION_INIT:
-      pn_logf("session created: %p", (void *) pn_event_session(event));
-      break;
-    case PN_LINK_INIT:
-      pn_logf("link created: %p", (void *) pn_event_link(event));
-      break;
-    case PN_CONNECTION_REMOTE_OPEN:
-    case PN_CONNECTION_REMOTE_CLOSE:
-    case PN_CONNECTION_LOCAL_OPEN:
-    case PN_CONNECTION_LOCAL_CLOSE:
-      pn_messenger_process_connection(messenger, event);
-      break;
-    case PN_SESSION_REMOTE_OPEN:
-    case PN_SESSION_REMOTE_CLOSE:
-    case PN_SESSION_LOCAL_OPEN:
-    case PN_SESSION_LOCAL_CLOSE:
-      pn_messenger_process_session(messenger, event);
-      break;
-    case PN_LINK_REMOTE_OPEN:
-    case PN_LINK_REMOTE_CLOSE:
-    case PN_LINK_REMOTE_DETACH:
-    case PN_LINK_LOCAL_OPEN:
-    case PN_LINK_LOCAL_CLOSE:
-    case PN_LINK_LOCAL_DETACH:
-      pn_messenger_process_link(messenger, event);
-      break;
-    case PN_LINK_FLOW:
-      pn_messenger_process_flow(messenger, event);
-      break;
-    case PN_DELIVERY:
-      pn_messenger_process_delivery(messenger, event);
-      break;
-    case PN_TRANSPORT:
-    case PN_TRANSPORT_ERROR:
-    case PN_TRANSPORT_HEAD_CLOSED:
-    case PN_TRANSPORT_TAIL_CLOSED:
-    case PN_TRANSPORT_CLOSED:
-      pn_messenger_process_transport(messenger, event);
-      break;
-    case PN_EVENT_NONE:
-      break;
-    case PN_CONNECTION_BOUND:
-      break;
-    case PN_CONNECTION_UNBOUND:
-      break;
-    case PN_CONNECTION_FINAL:
-      break;
-    case PN_SESSION_FINAL:
-      break;
-    case PN_LINK_FINAL:
-      break;
-    default:
-      break;
-    }
-    pn_collector_pop(messenger->collector);
-  }
-
-  return processed;
-}
-
-/**
- * Function to invoke AMQP related timer events, such as a heartbeat to prevent
- * remote_idle timeout events
- */
-static void pni_messenger_tick(pn_messenger_t *messenger)
-{
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
-    pn_connection_t *connection =
-        (pn_connection_t *)pn_list_get(messenger->connections, i);
-    pn_transport_t *transport = pn_connection_transport(connection);
-    if (transport) {
-      pn_transport_tick(transport, pn_i_now());
-
-      // if there is pending data, such as an empty heartbeat frame, call
-      // process events. This should kick off the chain of selectables for
-      // reading/writing.
-      ssize_t pending = pn_transport_pending(transport);
-      if (pending > 0) {
-        pn_connection_ctx_t *cctx =
-            (pn_connection_ctx_t *)pn_connection_get_context(connection);
-        pn_messenger_process_events(messenger);
-        pn_messenger_flow(messenger);
-        pni_conn_modified(pni_context(cctx->selectable));
-      }
-    }
-  }
-}
-
-int pn_messenger_process(pn_messenger_t *messenger)
-{
-  bool doMessengerTick = true;
-  pn_selectable_t *sel;
-  int events;
-  while ((sel = pn_selector_next(messenger->selector, &events))) {
-    if (events & PN_READABLE) {
-      pn_selectable_readable(sel);
-    }
-    if (events & PN_WRITABLE) {
-      pn_selectable_writable(sel);
-      doMessengerTick = false;
-    }
-    if (events & PN_EXPIRED) {
-      pn_selectable_expired(sel);
-    }
-    if (events & PN_ERROR) {
-      pn_selectable_error(sel);
-    }
-  }
-  // ensure timer events are processed. Cannot call this inside the while loop
-  // as the timer events are not seen by the selector
-  if (doMessengerTick) {
-    pni_messenger_tick(messenger);
-  }
-  if (messenger->interrupted) {
-    messenger->interrupted = false;
-    return PN_INTR;
-  } else {
-    return 0;
-  }
-}
-
-pn_timestamp_t pn_messenger_deadline(pn_messenger_t *messenger)
-{
-  // If the scheduler detects credit imbalance on the links, wake up
-  // in time to service credit drain
-  return messenger->next_drain;
-}
-
-int pni_wait(pn_messenger_t *messenger, int timeout)
-{
-  bool wake = false;
-  pn_selectable_t *sel;
-  while ((sel = pn_messenger_selectable(messenger))) {
-    if (pn_selectable_is_terminal(sel)) {
-      if (pn_selectable_is_registered(sel)) {
-        pn_selector_remove(messenger->selector, sel);
-      }
-      pn_selectable_free(sel);
-      // we can't wait if we end up freeing anything because we could
-      // be waiting on the stopped predicate which might become true
-      // as a result of the free
-      wake = true;
-    } else if (pn_selectable_is_registered(sel)) {
-      pn_selector_update(messenger->selector, sel);
-    } else {
-      pn_selector_add(messenger->selector, sel);
-      pn_selectable_set_registered(sel, true);
-    }
-  }
-
-  if (wake) return 0;
-
-  return pn_selector_select(messenger->selector, timeout);
-}
-
-int pn_messenger_tsync(pn_messenger_t *messenger, bool 
(*predicate)(pn_messenger_t *), int timeout)
-{
-  if (messenger->passive) {
-    bool pred = predicate(messenger);
-    return pred ? 0 : PN_INPROGRESS;
-  }
-
-  pn_timestamp_t now = pn_i_now();
-  long int deadline = now + timeout;
-  bool pred;
-
-  while (true) {
-    int error = pn_messenger_process(messenger);
-    pred = predicate(messenger);
-    if (error == PN_INTR) {
-      return pred ? 0 : PN_INTR;
-    }
-    int remaining = deadline - now;
-    if (pred || (timeout >= 0 && remaining < 0)) break;
-
-    pn_timestamp_t mdeadline = pn_messenger_deadline(messenger);
-    if (mdeadline) {
-      if (now >= mdeadline)
-        remaining = 0;
-      else {
-        const int delay = mdeadline - now;
-        remaining = (remaining < 0) ? delay : pn_min( remaining, delay );
-      }
-    }
-    error = pni_wait(messenger, remaining);
-    if (error) return error;
-
-    if (timeout >= 0) {
-      now = pn_i_now();
-    }
-  }
-
-  return pred ? 0 : PN_TIMEOUT;
-}
-
-int pn_messenger_sync(pn_messenger_t *messenger, bool 
(*predicate)(pn_messenger_t *))
-{
-  if (messenger->blocking) {
-    return pn_messenger_tsync(messenger, predicate, messenger->timeout);
-  } else {
-    int err = pn_messenger_tsync(messenger, predicate, 0);
-    if (err == PN_TIMEOUT) {
-      return PN_INPROGRESS;
-    } else {
-      return err;
-    }
-  }
-}
-
-static void pni_parse(pn_address_t *address);
-pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger,
-                                      const char *address, char **name);
-int pn_messenger_work(pn_messenger_t *messenger, int timeout);
-
-int pn_messenger_start(pn_messenger_t *messenger)
-{
-  if (!messenger) return PN_ARG_ERR;
-
-  int error = 0;
-
-  // When checking of routes is required we attempt to resolve each route
-  // with a substitution that has a defined scheme, address and port. If
-  // any of theses routes is invalid an appropriate error code will be
-  // returned. Currently no attempt is made to check the name part of the
-  // address, as the intent here is to fail fast if the addressed host
-  // is invalid or unavailable.
-  if (messenger->flags & PN_FLAGS_CHECK_ROUTES) {
-    pn_list_t *substitutions = pn_list(PN_WEAKREF, 0);
-    pn_transform_get_substitutions(messenger->routes, substitutions);
-    for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) {
-      pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i);
-      if (substitution) {
-        pn_address_t addr;
-        addr.text = pn_string(NULL);
-        error = pn_string_copy(addr.text, substitution);
-        if (!error) {
-          pni_parse(&addr);
-          if (addr.scheme && strlen(addr.scheme) > 0 &&
-              !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 
&&
-              !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 &&
-              !strstr(addr.port, "$")) {
-            pn_string_t *check_addr = pn_string(NULL);
-            // ipv6 hosts need to be wrapped in [] within a URI
-            if (strstr(addr.host, ":")) {
-              pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme,
-                               addr.host, addr.port);
-            } else {
-              pn_string_format(check_addr, "%s://%s:%s/", addr.scheme,
-                               addr.host, addr.port);
-            }
-            char *name = NULL;
-            pn_connection_t *connection = pn_messenger_resolve(
-                messenger, pn_string_get(check_addr), &name);
-            pn_free(check_addr);
-            if (!connection) {
-              if (pn_error_code(messenger->error) == 0)
-                pn_error_copy(messenger->error, pn_io_error(messenger->io));
-              pn_error_format(messenger->error, PN_ERR,
-                              "CONNECTION ERROR (%s:%s): %s\n",
-                              messenger->address.host, messenger->address.port,
-                              pn_error_text(messenger->error));
-              error = pn_error_code(messenger->error);
-            } else {
-              // Send and receive outstanding messages until connection
-              // completes or an error occurs
-              int work = pn_messenger_work(messenger, -1);
-              pn_connection_ctx_t *cctx =
-                  (pn_connection_ctx_t *)pn_connection_get_context(connection);
-              while ((work > 0 ||
-                      (pn_connection_state(connection) & PN_REMOTE_UNINIT) ||
-                      pni_connection_pending(cctx->selectable) != (ssize_t)0) 
&&
-                     pn_error_code(messenger->error) == 0)
-                work = pn_messenger_work(messenger, 0);
-              if (work < 0 && work != PN_TIMEOUT) {
-                error = work;
-              } else {
-                error = pn_error_code(messenger->error);
-              }
-            }
-          }
-          pn_free(addr.text);
-        }
-      }
-    }
-    pn_free(substitutions);
-  }
-
-  return error;
-}
-
-bool pn_messenger_stopped(pn_messenger_t *messenger)
-{
-  return pn_list_size(messenger->connections) == 0 && 
pn_list_size(messenger->listeners) == 0;
-}
-
-int pn_messenger_stop(pn_messenger_t *messenger)
-{
-  if (!messenger) return PN_ARG_ERR;
-
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
-    pn_connection_t *conn = (pn_connection_t *) 
pn_list_get(messenger->connections, i);
-    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-    while (link) {
-      pn_link_close(link);
-      link = pn_link_next(link, PN_LOCAL_ACTIVE);
-    }
-    pn_connection_close(conn);
-  }
-
-  for (size_t i = 0; i < pn_list_size(messenger->listeners); i++) {
-    pn_listener_ctx_t *lnr = (pn_listener_ctx_t *) 
pn_list_get(messenger->listeners, i);
-    pn_selectable_terminate(lnr->selectable);
-    pni_lnr_modified(lnr);
-  }
-
-  return pn_messenger_sync(messenger, pn_messenger_stopped);
-}
-
-static void pni_parse(pn_address_t *address)
-{
-  address->passive = false;
-  address->scheme = NULL;
-  address->user = NULL;
-  address->pass = NULL;
-  address->host = NULL;
-  address->port = NULL;
-  address->name = NULL;
-  pni_parse_url(pn_string_buffer(address->text), &address->scheme, 
&address->user,
-            &address->pass, &address->host, &address->port, &address->name);
-  if (address->host[0] == '~') {
-    address->passive = true;
-    address->host++;
-  }
-}
-
-static int pni_route(pn_messenger_t *messenger, const char *address)
-{
-  pn_address_t *addr = &messenger->address;
-  int err = pn_transform_apply(messenger->routes, address, addr->text);
-  if (err) return pn_error_format(messenger->error, PN_ERR,
-                                  "transformation error");
-  pni_parse(addr);
-  return 0;
-}
-
-pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char 
*address, char **name)
-{
-  assert(messenger);
-  messenger->connection_error = 0;
-  pn_string_t *domain = messenger->domain;
-
-  int err = pni_route(messenger, address);
-  if (err) return NULL;
-
-  bool passive = messenger->address.passive;
-  char *scheme = messenger->address.scheme;
-  char *user = messenger->address.user;
-  char *pass = messenger->address.pass;
-  char *host = messenger->address.host;
-  char *port = messenger->address.port;
-  *name = messenger->address.name;
-
-  if (passive) {
-    for (size_t i = 0; i < pn_list_size(messenger->listeners); i++) {
-      pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) 
pn_list_get(messenger->listeners, i);
-      if (pn_streq(host, ctx->host) && pn_streq(port, ctx->port)) {
-        return NULL;
-      }
-    }
-
-    pn_listener_ctx(messenger, scheme, host, port);
-    return NULL;
-  }
-
-  pn_string_set(domain, "");
-
-  if (user) {
-    pn_string_addf(domain, "%s@", user);
-  }
-  pn_string_addf(domain, "%s", host);
-  if (port) {
-    pn_string_addf(domain, ":%s", port);
-  }
-
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
-    pn_connection_t *connection = (pn_connection_t *) 
pn_list_get(messenger->connections, i);
-    pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(connection);
-    if (pn_streq(scheme, ctx->scheme) && pn_streq(user, ctx->user) &&
-        pn_streq(pass, ctx->pass) && pn_streq(host, ctx->host) &&
-        pn_streq(port, ctx->port)) {
-      return connection;
-    }
-    const char *container = pn_connection_remote_container(connection);
-    if (pn_streq(container, pn_string_get(domain))) {
-      return connection;
-    }
-  }
-
-  pn_socket_t sock = pn_connect(messenger->io, host, port ? port : 
default_port(scheme));
-  if (sock == PN_INVALID_SOCKET) {
-    pn_error_copy(messenger->error, pn_io_error(messenger->io));
-    pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n",
-                    messenger->address.host, messenger->address.port,
-                    pn_error_text(messenger->error));
-    return NULL;
-  }
-
-  pn_connection_t *connection =
-    pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, 
NULL);
-  pn_transport_t *transport = pn_transport();
-  if (messenger->flags & PN_FLAGS_ALLOW_INSECURE_MECHS && user && pass) {
-      pn_sasl_t *s = pn_sasl(transport);
-      pn_sasl_set_allow_insecure_mechs(s, true);
-  }
-  pn_transport_bind(transport, connection);
-  pn_decref(transport);
-  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(connection);
-  pn_selectable_t *sel = ctx->selectable;
-  err = pn_transport_config(messenger, connection);
-  if (err) {
-    pn_selectable_free(sel);
-    messenger->connection_error = err;
-    return NULL;
-  }
-
-  pn_connection_open(connection);
-
-  return connection;
-}
-
-pn_link_t *pn_messenger_get_link(pn_messenger_t *messenger,
-                                           const char *address, bool sender)
-{
-  char *name = NULL;
-  pn_connection_t *connection = pn_messenger_resolve(messenger, address, 
&name);
-  if (!connection) return NULL;
-
-  pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE);
-  while (link) {
-    if (pn_link_is_sender(link) == sender) {
-      const char *terminus = pn_link_is_sender(link) ?
-        pn_terminus_get_address(pn_link_target(link)) :
-        pn_terminus_get_address(pn_link_source(link));
-      if (pn_streq(name, terminus))
-        return link;
-    }
-    link = pn_link_next(link, PN_LOCAL_ACTIVE);
-  }
-  return NULL;
-}
-
-pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address,
-                             bool sender, pn_seconds_t timeout)
-{
-  char *name = NULL;
-  pn_connection_t *connection = pn_messenger_resolve(messenger, address, 
&name);
-  if (!connection)
-    return NULL;
-  pn_connection_ctx_t *cctx =
-      (pn_connection_ctx_t *)pn_connection_get_context(connection);
-
-  pn_link_t *link = pn_messenger_get_link(messenger, address, sender);
-  if (link)
-    return link;
-
-  pn_session_t *ssn = pn_session(connection);
-  pn_session_open(ssn);
-  if (sender) {
-    link = pn_sender(ssn, "sender-xxx");
-  } else {
-    if (name) {
-      link = pn_receiver(ssn, name);
-    } else {
-      link = pn_receiver(ssn, "");
-    }
-  }
-
-  if ((sender && pn_messenger_get_outgoing_window(messenger)) ||
-      (!sender && pn_messenger_get_incoming_window(messenger))) {
-    if (messenger->snd_settle_mode == -1) { /* Choose default based on 
sender/receiver */
-      /* For a sender use MIXED so the application can decide whether each
-         message is settled or not. For a receiver request UNSETTLED, since the
-         user set an incoming_window which means they want to decide 
settlement.
-      */
-      pn_link_set_snd_settle_mode(link, sender ? PN_SND_MIXED : 
PN_SND_UNSETTLED);
-    } else {                    /* Respect user setting */
-      pn_link_set_snd_settle_mode(link, 
(pn_snd_settle_mode_t)messenger->snd_settle_mode);
-    }
-    pn_link_set_rcv_settle_mode(link, messenger->rcv_settle_mode);
-  }
-  if (pn_streq(name, "#")) {
-    if (pn_link_is_sender(link)) {
-      pn_terminus_set_dynamic(pn_link_target(link), true);
-    } else {
-      pn_terminus_set_dynamic(pn_link_source(link), true);
-    }
-  } else {
-    pn_terminus_set_address(pn_link_target(link), name);
-    pn_terminus_set_address(pn_link_source(link), name);
-  }
-  link_ctx_setup( messenger, connection, link );
-
-  if (timeout > 0) {
-    pn_terminus_set_expiry_policy(pn_link_target(link), PN_EXPIRE_WITH_LINK);
-    pn_terminus_set_expiry_policy(pn_link_source(link), PN_EXPIRE_WITH_LINK);
-    pn_terminus_set_timeout(pn_link_target(link), timeout);
-    pn_terminus_set_timeout(pn_link_source(link), timeout);
-  }
-
-  if (!sender) {
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link);
-    assert( ctx );
-    ctx->subscription = pn_subscription(messenger, cctx->scheme, cctx->host,
-                                        cctx->port);
-  }
-  pn_link_open(link);
-  return link;
-}
-
-pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source,
-                               pn_seconds_t timeout)
-{
-  return pn_messenger_link(messenger, source, false, timeout);
-}
-
-pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target,
-                               pn_seconds_t timeout)
-{
-  return pn_messenger_link(messenger, target, true, timeout);
-}
-
-pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const 
char *source)
-{
-  return pn_messenger_subscribe_ttl(messenger, source, 0);
-}
-
-pn_subscription_t *pn_messenger_subscribe_ttl(pn_messenger_t *messenger,
-                                              const char *source,
-                                              pn_seconds_t timeout)
-{
-  pni_route(messenger, source);
-  if (pn_error_code(messenger->error)) return NULL;
-
-  bool passive = messenger->address.passive;
-  char *scheme = messenger->address.scheme;
-  char *host = messenger->address.host;
-  char *port = messenger->address.port;
-
-  if (passive) {
-    pn_listener_ctx_t *ctx = pn_listener_ctx(messenger, scheme, host, port);
-    if (ctx) {
-      return ctx->subscription;
-    } else {
-      return NULL;
-    }
-  } else {
-    pn_link_t *src = pn_messenger_source(messenger, source, timeout);
-    if (!src) return NULL;
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src );
-    return ctx ? ctx->subscription : NULL;
-  }
-}
-
-int pn_messenger_get_outgoing_window(pn_messenger_t *messenger)
-{
-  return pni_store_get_window(messenger->outgoing);
-}
-
-int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
-{
-  pni_store_set_window(messenger->outgoing, window);
-  return 0;
-}
-
-int pn_messenger_get_incoming_window(pn_messenger_t *messenger)
-{
-  return pni_store_get_window(messenger->incoming);
-}
-
-int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
-{
-  pni_store_set_window(messenger->incoming, window);
-  return 0;
-}
-
-static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
-{
-  char stackbuf[256];
-  char *heapbuf = NULL;
-  char *buf = stackbuf;
-  const char *address = pn_message_get_reply_to(msg);
-  int len = address ? strlen(address) : 0;
-  if (len > 1 && address[0] == '~' && address[1] == '/') {
-    unsigned needed = len + strlen(mng->name) + 9;
-    if (needed > sizeof(stackbuf)) {
-      heapbuf = (char *) malloc(needed);
-      buf = heapbuf;
-    }
-    sprintf(buf, "amqp://%s/%s", mng->name, address + 2);
-    pn_message_set_reply_to(msg, buf);
-  } else if (len == 1 && address[0] == '~') {
-    unsigned needed = strlen(mng->name) + 8;
-    if (needed > sizeof(stackbuf)) {
-      heapbuf = (char *) malloc(needed);
-      buf = heapbuf;
-    }
-    sprintf(buf, "amqp://%s", mng->name);
-    pn_message_set_reply_to(msg, buf);
-  }
-  if (heapbuf) free (heapbuf);
-}
-
-int pni_bump_out(pn_messenger_t *messenger, const char *address)
-{
-  pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
-  if (!entry) return 0;
-
-  pni_entry_set_status(entry, PN_STATUS_ABORTED);
-  pni_entry_free(entry);
-  return 0;
-}
-
-int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t 
*sender)
-{
-  pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
-  if (!entry) {
-    pn_link_drained(sender);
-    return 0;
-  }
-
-  pn_buffer_t *buf = pni_entry_bytes(entry);
-  pn_bytes_t bytes = pn_buffer_bytes(buf);
-  const char *encoded = bytes.start;
-  size_t size = bytes.size;
-
-  // XXX: proper tag
-  char tag[8];
-  void *ptr = &tag;
-  uint64_t next = messenger->next_tag++;
-  *((uint64_t *) ptr) = next;
-  pn_delivery_t *d = pn_delivery(sender, pn_dtag(tag, 8));
-  pni_entry_set_delivery(entry, d);
-  ssize_t n = pn_link_send(sender, encoded, size);
-  if (n < 0) {
-    pni_entry_free(entry);
-    return pn_error_format(messenger->error, n, "send error: %s",
-                           pn_error_text(pn_link_error(sender)));
-  } else {
-    pn_link_advance(sender);
-    pni_entry_free(entry);
-    return 0;
-  }
-}
-
-static void pni_default_rewrite(pn_messenger_t *messenger, const char *address,
-                                pn_string_t *dst)
-{
-  pn_address_t *addr = &messenger->address;
-  if (address && strstr(address, "@")) {
-    int err = pn_string_set(addr->text, address);
-    if (err) assert(false);
-    pni_parse(addr);
-    if (addr->user || addr->pass)
-    {
-      pn_string_format(messenger->rewritten, "%s%s%s%s%s%s%s",
-                       addr->scheme ? addr->scheme : "",
-                       addr->scheme ? "://" : "",
-                       addr->host,
-                       addr->port ? ":" : "",
-                       addr->port ? addr->port : "",
-                       addr->name ? "/" : "",
-                       addr->name ? addr->name : "");
-    }
-  }
-}
-
-static void pni_rewrite(pn_messenger_t *messenger, pn_message_t *msg)
-{
-  const char *address = pn_message_get_address(msg);
-  pn_string_set(messenger->original, address);
-
-  int err = pn_transform_apply(messenger->rewrites, address,
-                               messenger->rewritten);
-  if (err) assert(false);
-  if (!pn_transform_matched(messenger->rewrites)) {
-    pni_default_rewrite(messenger, pn_string_get(messenger->rewritten),
-                        messenger->rewritten);
-  }
-  pn_message_set_address(msg, pn_string_get(messenger->rewritten));
-}
-
-static void pni_restore(pn_messenger_t *messenger, pn_message_t *msg)
-{
-  pn_message_set_address(msg, pn_string_get(messenger->original));
-}
-
-int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
-{
-  if (!messenger) return PN_ARG_ERR;
-  if (!msg) return pn_error_set(messenger->error, PN_ARG_ERR, "null message");
-  outward_munge(messenger, msg);
-  const char *address = pn_message_get_address(msg);
-
-  pni_entry_t *entry = pni_store_put(messenger->outgoing, address);
-  if (!entry)
-    return pn_error_format(messenger->error, PN_ERR, "store error");
-
-  messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_track(entry));
-  pn_buffer_t *buf = pni_entry_bytes(entry);
-
-  pni_rewrite(messenger, msg);
-  while (true) {
-    char *encoded = pn_buffer_memory(buf).start;
-    size_t size = pn_buffer_capacity(buf);
-    int err = pn_message_encode(msg, encoded, &size);
-    if (err == PN_OVERFLOW) {
-      err = pn_buffer_ensure(buf, 2*pn_buffer_capacity(buf));
-      if (err) {
-        pni_entry_free(entry);
-        pni_restore(messenger, msg);
-        return pn_error_format(messenger->error, err, "put: error growing 
buffer");
-      }
-    } else if (err) {
-      pni_restore(messenger, msg);
-      return pn_error_format(messenger->error, err, "encode error: %s",
-                             pn_message_error(msg));
-    } else {
-      pni_restore(messenger, msg);
-      pn_buffer_append(buf, encoded, size); // XXX
-      pn_link_t *sender = pn_messenger_target(messenger, address, 0);
-      if (!sender) {
-        int err = pn_error_code(messenger->error);
-        if (err) {
-          return err;
-        } else if (messenger->connection_error) {
-          return pni_bump_out(messenger, address);
-        } else {
-          return 0;
-        }
-      } else {
-        return pni_pump_out(messenger, address, sender);
-      }
-    }
-  }
-
-  return PN_ERR;
-}
-
-pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->outgoing_tracker;
-}
-
-pni_store_t *pn_tracker_store(pn_messenger_t *messenger, pn_tracker_t tracker)
-{
-  if (pn_tracker_direction(tracker) == OUTGOING) {
-    return messenger->outgoing;
-  } else {
-    return messenger->incoming;
-  }
-}
-
-pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t 
tracker)
-{
-  pni_store_t *store = pn_tracker_store(messenger, tracker);
-  pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
-  if (e) {
-    return pni_entry_get_status(e);
-  } else {
-    return PN_STATUS_UNKNOWN;
-  }
-}
-
-pn_delivery_t *pn_messenger_delivery(pn_messenger_t *messenger,
-                                     pn_tracker_t tracker)
-{
-  pni_store_t *store = pn_tracker_store(messenger, tracker);
-  pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
-  if (e) {
-    return pni_entry_get_delivery(e);
-  } else {
-    return NULL;
-  }
-}
-
-bool pn_messenger_buffered(pn_messenger_t *messenger, pn_tracker_t tracker)
-{
-  pni_store_t *store = pn_tracker_store(messenger, tracker);
-  pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
-  if (e) {
-    pn_delivery_t *d = pni_entry_get_delivery(e);
-    if (d) {
-      bool b = pn_delivery_buffered(d);
-      return b;
-    } else {
-      return true;
-    }
-  } else {
-    return false;
-  }
-}
-
-int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int 
flags)
-{
-  pni_store_t *store = pn_tracker_store(messenger, tracker);
-  return pni_store_update(store, pn_tracker_sequence(tracker), 
PN_STATUS_UNKNOWN, flags, true, true);
-}
-
-// true if all pending output has been sent to peer
-bool pn_messenger_sent(pn_messenger_t *messenger)
-{
-  int total = pni_store_size(messenger->outgoing);
-
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++)
-  {
-    pn_connection_t *conn = (pn_connection_t *) 
pn_list_get(messenger->connections, i);
-    // check if transport is done generating output
-    pn_transport_t *transport = pn_connection_transport(conn);
-    if (transport) {
-      if (!pn_transport_quiesced(transport)) {
-        return false;
-      }
-    }
-
-    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-    while (link) {
-      if (pn_link_is_sender(link)) {
-        total += pn_link_queued(link);
-
-        pn_delivery_t *d = pn_unsettled_head(link);
-        while (d) {
-          if (!pn_delivery_remote_state(d) && !pn_delivery_settled(d)) {
-            total++;
-          }
-          d = pn_unsettled_next(d);
-        }
-      }
-      link = pn_link_next(link, PN_LOCAL_ACTIVE);
-    }
-  }
-
-  return total <= messenger->send_threshold;
-}
-
-bool pn_messenger_rcvd(pn_messenger_t *messenger)
-{
-  if (pni_store_size(messenger->incoming) > 0) return true;
-
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++)
-  {
-    pn_connection_t *conn = (pn_connection_t *) 
pn_list_get(messenger->connections, i);
-
-    pn_delivery_t *d = pn_work_head(conn);
-    while (d) {
-      if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
-        return true;
-      }
-      d = pn_work_next(d);
-    }
-  }
-
-  if (!pn_list_size(messenger->connections) && 
!pn_list_size(messenger->listeners)) {
-    return true;
-  } else {
-    return false;
-  }
-}
-
-static bool work_pred(pn_messenger_t *messenger) {
-  return messenger->worked;
-}
-
-int pn_messenger_work(pn_messenger_t *messenger, int timeout)
-{
-  messenger->worked = false;
-  int err = pn_messenger_tsync(messenger, work_pred, timeout);
-  if (err) {
-    return err;
-  }
-  return (int) (messenger->worked ? 1 : 0);
-}
-
-int pni_messenger_work(pn_messenger_t *messenger)
-{
-  if (messenger->blocking) {
-    return pn_messenger_work(messenger, messenger->timeout);
-  } else {
-    int err = pn_messenger_work(messenger, 0);
-    if (err == PN_TIMEOUT) {
-      return PN_INPROGRESS;
-    } else {
-      return err;
-    }
-  }
-}
-
-int pn_messenger_interrupt(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  ssize_t n = pn_write(messenger->io, messenger->ctrl[1], "x", 1);
-  if (n <= 0) {
-    return n;
-  } else {
-    return 0;
-  }
-}
-
-int pn_messenger_send(pn_messenger_t *messenger, int n)
-{
-  if (n == -1) {
-    messenger->send_threshold = 0;
-  } else {
-    messenger->send_threshold = pn_messenger_outgoing(messenger) - n;
-    if (messenger->send_threshold < 0)
-      messenger->send_threshold = 0;
-  }
-  return pn_messenger_sync(messenger, pn_messenger_sent);
-}
-
-int pn_messenger_recv(pn_messenger_t *messenger, int n)
-{
-  if (!messenger) return PN_ARG_ERR;
-  if (messenger->blocking && !pn_list_size(messenger->listeners)
-      && !pn_list_size(messenger->connections))
-    return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
-
-  // re-compute credit, and update credit scheduler
-  if (n == -2) {
-    messenger->credit_mode = LINK_CREDIT_MANUAL;
-  } else if (n == -1) {
-    messenger->credit_mode = LINK_CREDIT_AUTO;
-  } else {
-    messenger->credit_mode = LINK_CREDIT_EXPLICIT;
-    if (n > messenger->distributed)
-      messenger->credit = n - messenger->distributed;
-    else  // cancel unallocated
-      messenger->credit = 0;
-  }
-  pn_messenger_flow(messenger);
-  int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
-  if (err) return err;
-  if (!pn_messenger_incoming(messenger) &&
-      messenger->blocking &&
-      !pn_list_size(messenger->listeners) &&
-      !pn_list_size(messenger->connections)) {
-    return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
-  } else {
-    return 0;
-  }
-}
-
-int pn_messenger_receiving(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->credit + messenger->distributed;
-}
-
-int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
-{
-  if (!messenger) return PN_ARG_ERR;
-
-  pni_entry_t *entry = pni_store_get(messenger->incoming, NULL);
-  // XXX: need to drain credit before returning EOS
-  if (!entry) return PN_EOS;
-
-  messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_track(entry));
-  pn_buffer_t *buf = pni_entry_bytes(entry);
-  pn_bytes_t bytes = pn_buffer_bytes(buf);
-  const char *encoded = bytes.start;
-  size_t size = bytes.size;
-
-  messenger->incoming_subscription = (pn_subscription_t *) 
pni_entry_get_context(entry);
-
-  if (msg) {
-    int err = pn_message_decode(msg, encoded, size);
-    pni_entry_free(entry);
-    if (err) {
-      return pn_error_format(messenger->error, err, "error decoding message: 
%s",
-                             pn_message_error(msg));
-    } else {
-      return 0;
-    }
-  } else {
-    pni_entry_free(entry);
-    return 0;
-  }
-}
-
-pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger)
-{
-  assert(messenger);
-  return messenger->incoming_tracker;
-}
-
-pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t 
*messenger)
-{
-  assert(messenger);
-  return messenger->incoming_subscription;
-}
-
-int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int 
flags)
-{
-  if (pn_tracker_direction(tracker) != INCOMING) {
-    return pn_error_format(messenger->error, PN_ARG_ERR,
-                           "invalid tracker, incoming tracker required");
-  }
-
-  return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
-                          PN_STATUS_ACCEPTED, flags, false, false);
-}
-
-int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int 
flags)
-{
-  if (pn_tracker_direction(tracker) != INCOMING) {
-    return pn_error_format(messenger->error, PN_ARG_ERR,
-                           "invalid tracker, incoming tracker required");
-  }
-
-  return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
-                          PN_STATUS_REJECTED, flags, false, false);
-}
-
-pn_link_t *pn_messenger_tracker_link(pn_messenger_t *messenger,
-                                               pn_tracker_t tracker)
-{
-  pni_store_t *store = pn_tracker_store(messenger, tracker);
-  pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
-  if (e) {
-    pn_delivery_t *d = pni_entry_get_delivery(e);
-    if (d) {
-      return pn_delivery_link(d);
-    }
-  }
-  return NULL;
-}
-
-int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
-{
-  if (!messenger) return 0;
-
-  int result = 0;
-
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
-    pn_connection_t *conn = (pn_connection_t *) 
pn_list_get(messenger->connections, i);
-
-    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-    while (link) {
-      if (pn_link_is_sender(link)) {
-        if (sender) {
-          result += pn_link_queued(link);
-        }
-      } else if (!sender) {
-        result += pn_link_queued(link);
-      }
-      link = pn_link_next(link, PN_LOCAL_ACTIVE);
-    }
-  }
-
-  return result;
-}
-
-int pn_messenger_outgoing(pn_messenger_t *messenger)
-{
-  return pni_store_size(messenger->outgoing) + pn_messenger_queued(messenger, 
true);
-}
-
-int pn_messenger_incoming(pn_messenger_t *messenger)
-{
-  return pni_store_size(messenger->incoming) + pn_messenger_queued(messenger, 
false);
-}
-
-int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, const 
char *address)
-{
-  pn_transform_rule(messenger->routes, pattern, address);
-  return 0;
-}
-
-int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern, const 
char *address)
-{
-  pn_transform_rule(messenger->rewrites, pattern, address);
-  return 0;
-}
-
-int pn_messenger_set_flags(pn_messenger_t *messenger, const int flags)
-{
-  if (!messenger)
-    return PN_ARG_ERR;
-  if (flags == 0) {
-    messenger->flags = 0;
-  } else if (flags & (PN_FLAGS_CHECK_ROUTES | PN_FLAGS_ALLOW_INSECURE_MECHS)) {
-    messenger->flags |= flags;
-  } else {
-    return PN_ARG_ERR;
-  }
-  return 0;
-}
-
-int pn_messenger_get_flags(pn_messenger_t *messenger)
-{
-  return messenger ? messenger->flags : 0;
-}
-
-int pn_messenger_set_snd_settle_mode(pn_messenger_t *messenger,
-                                     const pn_snd_settle_mode_t mode)
-{
-  if (!messenger)
-    return PN_ARG_ERR;
-  messenger->snd_settle_mode = mode;
-  return 0;
-}
-
-int pn_messenger_set_rcv_settle_mode(pn_messenger_t *messenger,
-                                     const pn_rcv_settle_mode_t mode)
-{
-  if (!messenger)
-    return PN_ARG_ERR;
-  messenger->rcv_settle_mode = mode;
-  return 0;
-}
-
-void pn_messenger_set_tracer(pn_messenger_t *messenger, pn_tracer_t tracer)
-{
-  assert(messenger);
-  assert(tracer);
-
-  messenger->tracer = tracer;
-}
-
-pn_millis_t pn_messenger_get_remote_idle_timeout(pn_messenger_t *messenger,
-                                                 const char *address)
-{
-  if (!messenger)
-    return PN_ARG_ERR;
-
-  pn_address_t addr;
-  addr.text = pn_string(address);
-  pni_parse(&addr);
-
-  pn_millis_t timeout = -1;
-  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
-    pn_connection_t *connection =
-        (pn_connection_t *)pn_list_get(messenger->connections, i);
-    pn_connection_ctx_t *ctx =
-        (pn_connection_ctx_t *)pn_connection_get_context(connection);
-    if (pn_streq(addr.scheme, ctx->scheme) && pn_streq(addr.host, ctx->host) &&
-        pn_streq(addr.port, ctx->port)) {
-      pn_transport_t *transport = pn_connection_transport(connection);
-      if (transport)
-        timeout = pn_transport_get_remote_idle_timeout(transport);
-      break;
-    }
-  }
-  return timeout;
-}
-
-int
-pn_messenger_set_ssl_peer_authentication_mode(pn_messenger_t *messenger,
-                                              const pn_ssl_verify_mode_t mode)
-{
-  if (!messenger)
-    return PN_ARG_ERR;
-  messenger->ssl_peer_authentication_mode = mode;
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/messenger/messenger.h
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.h 
b/proton-c/src/messenger/messenger.h
deleted file mode 100644
index 01b1838..0000000
--- a/proton-c/src/messenger/messenger.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#ifndef _PROTON_MESSENGER_H
-#define _PROTON_MESSENGER_H 1
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/messenger.h>
-
-int pni_messenger_add_subscription(pn_messenger_t *messenger, 
pn_subscription_t *subscription);
-int pni_messenger_work(pn_messenger_t *messenger);
-
-#endif /* messenger.h */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to