http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c 
b/proton-c/src/messenger/messenger.c
index 29b2eeb..0e2488b 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -31,6 +31,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
+
 #include "../util.h"
 #include "../platform.h"
 #include "../platform_fmt.h"
@@ -60,30 +61,20 @@ typedef  enum {
 } pn_link_credit_mode_t;
 
 struct pn_messenger_t {
+  pn_address_t address;
   char *name;
   char *certificate;
   char *private_key;
   char *password;
   char *trusted_certificates;
-  int timeout;
-  bool blocking;
-  bool passive;
   pn_io_t *io;
   pn_list_t *pending; // pending selectables
   pn_selectable_t *interruptor;
-  bool interrupted;
   pn_socket_t ctrl[2];
   pn_list_t *listeners;
   pn_list_t *connections;
   pn_selector_t *selector;
   pn_collector_t *collector;
-  int send_threshold;
-  pn_link_credit_mode_t credit_mode;
-  int credit_batch;  // when LINK_CREDIT_AUTO
-  int credit;        // available
-  int distributed;   // credit
-  int receivers;     // # receiver links
-  int draining;      // # links in drain state
   pn_list_t *credited;
   pn_list_t *blocked;
   pn_timestamp_t next_drain;
@@ -95,13 +86,24 @@ struct pn_messenger_t {
   pn_error_t *error;
   pn_transform_t *routes;
   pn_transform_t *rewrites;
-  pn_address_t address;
   pn_tracker_t outgoing_tracker;
   pn_tracker_t incoming_tracker;
   pn_string_t *original;
   pn_string_t *rewritten;
-  bool worked;
+  pn_string_t *domain;
+  int timeout;
+  int send_threshold;
+  pn_link_credit_mode_t credit_mode;
+  int credit_batch;  // when LINK_CREDIT_AUTO
+  int credit;        // available
+  int distributed;   // credit
+  int receivers;     // # receiver links
+  int draining;      // # links in drain state
   int connection_error;
+  bool blocking;
+  bool passive;
+  bool interrupted;
+  bool worked;
 };
 
 #define CTX_HEAD                                \
@@ -611,7 +613,7 @@ pn_messenger_t *pn_messenger(const char *name)
     pni_selectable_set_context(m->interruptor, m);
     m->listeners = pn_list(0, 0);
     m->connections = pn_list(0, 0);
-    m->selector = pn_selector();
+    m->selector = pn_io_selector(m->io);
     m->collector = pn_collector();
     m->credit_mode = LINK_CREDIT_EXPLICIT;
     m->credit_batch = 1024;
@@ -635,6 +637,7 @@ pn_messenger_t *pn_messenger(const char *name)
     m->address.text = pn_string(NULL);
     m->original = pn_string(NULL);
     m->rewritten = pn_string(NULL);
+    m->domain = pn_string(NULL);
     m->connection_error = 0;
   }
 
@@ -775,6 +778,7 @@ static void pni_reclaim(pn_messenger_t *messenger)
 void pn_messenger_free(pn_messenger_t *messenger)
 {
   if (messenger) {
+    pn_free(messenger->domain);
     pn_free(messenger->rewritten);
     pn_free(messenger->original);
     pn_free(messenger->address.text);
@@ -971,7 +975,7 @@ int pni_pump_in(pn_messenger_t *messenger, const char 
*address, pn_link_t *recei
   size_t pending = pn_delivery_pending(d);
   int err = pn_buffer_ensure(buf, pending + 1);
   if (err) return pn_error_format(messenger->error, err, "get: error growing 
buffer");
-  char *encoded = pn_buffer_bytes(buf).start;
+  char *encoded = pn_buffer_memory(buf).start;
   ssize_t n = pn_link_recv(receiver, encoded, pending);
   if (n != (ssize_t) pending) {
     return pn_error_format(messenger->error, n,
@@ -1221,16 +1225,31 @@ int pn_messenger_process_events(pn_messenger_t 
*messenger)
   while ((event = pn_collector_peek(messenger->collector))) {
     processed++;
     switch (pn_event_type(event)) {
-    case PN_CONNECTION_REMOTE_STATE:
-    case PN_CONNECTION_LOCAL_STATE:
+    case PN_CONNECTION_INIT:
+      //printf("connection created: %p\n", (void *) 
pn_event_connection(event));
+      break;
+    case PN_SESSION_INIT:
+      //printf("session created: %p\n", (void *) pn_event_session(event));
+      break;
+    case PN_LINK_INIT:
+      //printf("link created: %p\n", (void *) pn_event_link(event));
+      break;
+    case PN_CONNECTION_REMOTE_OPEN:
+    case PN_CONNECTION_REMOTE_CLOSE:
+    case PN_CONNECTION_OPEN:
+    case PN_CONNECTION_CLOSE:
       pn_messenger_process_connection(messenger, event);
       break;
-    case PN_SESSION_REMOTE_STATE:
-    case PN_SESSION_LOCAL_STATE:
+    case PN_SESSION_REMOTE_OPEN:
+    case PN_SESSION_REMOTE_CLOSE:
+    case PN_SESSION_OPEN:
+    case PN_SESSION_CLOSE:
       pn_messenger_process_session(messenger, event);
       break;
-    case PN_LINK_REMOTE_STATE:
-    case PN_LINK_LOCAL_STATE:
+    case PN_LINK_REMOTE_OPEN:
+    case PN_LINK_REMOTE_CLOSE:
+    case PN_LINK_OPEN:
+    case PN_LINK_CLOSE:
       pn_messenger_process_link(messenger, event);
       break;
     case PN_LINK_FLOW:
@@ -1244,6 +1263,12 @@ int pn_messenger_process_events(pn_messenger_t 
*messenger)
       break;
     case PN_EVENT_NONE:
       break;
+    case PN_CONNECTION_FINAL:
+      break;
+    case PN_SESSION_FINAL:
+      break;
+    case PN_LINK_FINAL:
+      break;
     }
     pn_collector_pop(messenger->collector);
   }
@@ -1251,8 +1276,37 @@ int pn_messenger_process_events(pn_messenger_t 
*messenger)
   return processed;
 }
 
+/**
+ * Function to invoke AMQP related timer events, such as a heartbeat to prevent
+ * remote_idle timeout events
+ */
+static void pni_messenger_tick(pn_messenger_t *messenger)
+{
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+    pn_connection_t *connection =
+        (pn_connection_t *)pn_list_get(messenger->connections, i);
+    pn_transport_t *transport = pn_connection_transport(connection);
+    if (transport) {
+      pn_transport_tick(transport, pn_i_now());
+
+      // if there is pending data, such as an empty heartbeat frame, call
+      // process events. This should kick off the chain of selectables for
+      // reading/writing.
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        pn_connection_ctx_t *cctx =
+            (pn_connection_ctx_t *)pn_connection_get_context(connection);
+        pn_messenger_process_events(messenger);
+        pn_messenger_flow(messenger);
+        pni_conn_modified(pni_context(cctx->selectable));
+      }
+    }
+  }
+}
+
 int pn_messenger_process(pn_messenger_t *messenger)
 {
+  bool doMessengerTick = true;
   pn_selectable_t *sel;
   int events;
   while ((sel = pn_selector_next(messenger->selector, &events))) {
@@ -1261,12 +1315,17 @@ int pn_messenger_process(pn_messenger_t *messenger)
     }
     if (events & PN_WRITABLE) {
       pn_selectable_writable(sel);
+      doMessengerTick = false;
     }
     if (events & PN_EXPIRED) {
       pn_selectable_expired(sel);
     }
   }
-
+  // ensure timer events are processed. Cannot call this inside the while loop
+  // as the timer events are not seen by the selector
+  if (doMessengerTick) {
+    pni_messenger_tick(messenger);
+  }
   if (messenger->interrupted) {
     messenger->interrupted = false;
     return PN_INTR;
@@ -1429,12 +1488,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t 
*messenger, const char *add
 {
   assert(messenger);
   messenger->connection_error = 0;
-  char domain[1024];
-  if (address && sizeof(domain) < strlen(address) + 1) {
-    pn_error_format(messenger->error, PN_ERR,
-                    "address exceeded maximum length: %s", address);
-    return NULL;
-  }
+  pn_string_t *domain = messenger->domain;
 
   int err = pni_route(messenger, address);
   if (err) return NULL;
@@ -1459,16 +1513,14 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t 
*messenger, const char *add
     return NULL;
   }
 
-  domain[0] = '\0';
+  pn_string_set(domain, "");
 
   if (user) {
-    strcat(domain, user);
-    strcat(domain, "@");
+    pn_string_addf(domain, "%s@", user);
   }
-  strcat(domain, host);
+  pn_string_addf(domain, "%s", host);
   if (port) {
-    strcat(domain, ":");
-    strcat(domain, port);
+    pn_string_addf(domain, ":%s", port);
   }
 
   for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
@@ -1480,7 +1532,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t 
*messenger, const char *add
       return connection;
     }
     const char *container = pn_connection_remote_container(connection);
-    if (pn_streq(container, domain)) {
+    if (pn_streq(container, pn_string_get(domain))) {
       return connection;
     }
   }
@@ -1529,7 +1581,16 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, 
const char *address, boo
 
   pn_session_t *ssn = pn_session(connection);
   pn_session_open(ssn);
-  link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, 
"receiver-xxx");
+  if (sender) {
+    link = pn_sender(ssn, "sender-xxx");
+  } else {
+    if (name) {
+      link = pn_receiver(ssn, name);
+    } else {
+      link = pn_receiver(ssn, "");
+    }
+  }
+
   if ((sender && pn_messenger_get_outgoing_window(messenger)) ||
       (!sender && pn_messenger_get_incoming_window(messenger))) {
     // use explicit settlement via dispositions (not pre-settled)
@@ -1662,7 +1723,7 @@ int pni_pump_out(pn_messenger_t *messenger, const char 
*address, pn_link_t *send
 
   pn_buffer_t *buf = pni_entry_bytes(entry);
   pn_bytes_t bytes = pn_buffer_bytes(buf);
-  char *encoded = bytes.start;
+  const char *encoded = bytes.start;
   size_t size = bytes.size;
 
   // XXX: proper tag
@@ -1742,7 +1803,7 @@ int pn_messenger_put(pn_messenger_t *messenger, 
pn_message_t *msg)
 
   pni_rewrite(messenger, msg);
   while (true) {
-    char *encoded = pn_buffer_bytes(buf).start;
+    char *encoded = pn_buffer_memory(buf).start;
     size_t size = pn_buffer_capacity(buf);
     int err = pn_message_encode(msg, encoded, &size);
     if (err == PN_OVERFLOW) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/store.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/store.c b/proton-c/src/messenger/store.c
index e36c5bb..88d6a5d 100644
--- a/proton-c/src/messenger/store.c
+++ b/proton-c/src/messenger/store.c
@@ -34,36 +34,36 @@
 typedef struct pni_stream_t pni_stream_t;
 
 struct pni_store_t {
-  size_t size;
   pni_stream_t *streams;
   pni_entry_t *store_head;
   pni_entry_t *store_tail;
+  pn_hash_t *tracked;
+  size_t size;
   int window;
   pn_sequence_t lwm;
   pn_sequence_t hwm;
-  pn_hash_t *tracked;
 };
 
 struct pni_stream_t {
   pni_store_t *store;
-  char address[1024]; // XXX
+  pn_string_t *address;
   pni_entry_t *stream_head;
   pni_entry_t *stream_tail;
   pni_stream_t *next;
 };
 
 struct pni_entry_t {
-  pn_sequence_t id;
   pni_stream_t *stream;
-  bool free;
   pni_entry_t *stream_next;
   pni_entry_t *stream_prev;
   pni_entry_t *store_next;
   pni_entry_t *store_prev;
-  pn_status_t status;
   pn_buffer_t *bytes;
   pn_delivery_t *delivery;
   void *context;
+  pn_status_t status;
+  pn_sequence_t id;
+  bool free;
 };
 
 void pni_entry_finalize(void *object)
@@ -104,13 +104,11 @@ pni_stream_t *pni_stream(pni_store_t *store, const char 
*address, bool create)
 {
   assert(store);
   assert(address);
-  // XXX
-  if (strlen(address) >= 1024) return NULL;
 
   pni_stream_t *prev = NULL;
   pni_stream_t *stream = store->streams;
   while (stream) {
-    if (!strcmp(stream->address, address)) {
+    if (!strcmp(pn_string_get(stream->address), address)) {
       return stream;
     }
     prev = stream;
@@ -120,7 +118,7 @@ pni_stream_t *pni_stream(pni_store_t *store, const char 
*address, bool create)
   if (create) {
     stream = (pni_stream_t *) malloc(sizeof(pni_stream_t));
     stream->store = store;
-    strcpy(stream->address, address);
+    stream->address = pn_string(address);
     stream->stream_head = NULL;
     stream->stream_tail = NULL;
     stream->next = NULL;
@@ -169,6 +167,8 @@ void pni_stream_free(pni_stream_t *stream)
   while ((entry = LL_HEAD(stream, stream))) {
     pni_entry_free(entry);
   }
+  pn_free(stream->address);
+  stream->address = NULL;
   free(stream);
 }
 
@@ -205,7 +205,7 @@ pni_stream_t *pni_stream_get(pni_store_t *store, const char 
*address)
 pni_entry_t *pni_store_put(pni_store_t *store, const char *address)
 {
   assert(store);
-  static pn_class_t clazz = PN_CLASS(pni_entry);
+  static const pn_class_t clazz = PN_CLASS(pni_entry);
 
   if (!address) address = "";
   pni_stream_t *stream = pni_stream_put(store, address);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/subscription.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/subscription.c 
b/proton-c/src/messenger/subscription.c
index 95f3f09..346a23f 100644
--- a/proton-c/src/messenger/subscription.c
+++ b/proton-c/src/messenger/subscription.c
@@ -64,7 +64,7 @@ pn_subscription_t *pn_subscription(pn_messenger_t *messenger,
                                    const char *host,
                                    const char *port)
 {
-  static pn_class_t clazz = PN_CLASS(pn_subscription);
+  static const pn_class_t clazz = PN_CLASS(pn_subscription);
   pn_subscription_t *sub = (pn_subscription_t *) 
pn_new(sizeof(pn_subscription_t), &clazz);
   sub->messenger = messenger;
   pn_string_set(sub->scheme, scheme);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/transform.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/transform.c 
b/proton-c/src/messenger/transform.c
index c9d2c14..801eb10 100644
--- a/proton-c/src/messenger/transform.c
+++ b/proton-c/src/messenger/transform.c
@@ -62,7 +62,7 @@ static void pn_rule_finalize(void *object)
 
 pn_rule_t *pn_rule(const char *pattern, const char *substitution)
 {
-  static pn_class_t clazz = PN_CLASS(pn_rule);
+  static const pn_class_t clazz = PN_CLASS(pn_rule);
   pn_rule_t *rule = (pn_rule_t *) pn_new(sizeof(pn_rule_t), &clazz);
   rule->pattern = pn_string(pattern);
   rule->substitution = pn_string(substitution);
@@ -82,7 +82,7 @@ static void pn_transform_finalize(void *object)
 
 pn_transform_t *pn_transform()
 {
-  static pn_class_t clazz = PN_CLASS(pn_transform);
+  static const pn_class_t clazz = PN_CLASS(pn_transform);
   pn_transform_t *transform = (pn_transform_t *) 
pn_new(sizeof(pn_transform_t), &clazz);
   transform->rules = pn_list(0, PN_REFCOUNT);
   transform->matched = false;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/object/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/object/object.c b/proton-c/src/object/object.c
index dd5fd17..d1b0e43 100644
--- a/proton-c/src/object/object.c
+++ b/proton-c/src/object/object.c
@@ -29,14 +29,19 @@
 #include <ctype.h>
 
 typedef struct {
-  pn_class_t *clazz;
+  const pn_class_t *clazz;
   int refcount;
 } pni_head_t;
 
 #define pni_head(PTR) \
   (((pni_head_t *) (PTR)) - 1)
 
-void *pn_new(size_t size, pn_class_t *clazz)
+void *pn_new(size_t size, const pn_class_t *clazz)
+{
+  return pn_new2(size, clazz, NULL);
+}
+
+void *pn_new2(size_t size, const pn_class_t *clazz, void *from)
 {
   pni_head_t *head = (pni_head_t *) malloc(sizeof(pni_head_t) + size);
   void *object = head + 1;
@@ -44,7 +49,7 @@ void *pn_new(size_t size, pn_class_t *clazz)
   return object;
 }
 
-void pn_initialize(void *object, pn_class_t *clazz)
+void pn_initialize(void *object, const pn_class_t *clazz)
 {
   pni_head_t *head = pni_head(object);
   head->clazz = clazz;
@@ -54,15 +59,22 @@ void pn_initialize(void *object, pn_class_t *clazz)
   }
 }
 
-void *pn_incref(void *object)
-{
+void *pn_incref(void *object) {
+  return pn_incref2(object, NULL);
+}
+
+void *pn_incref2(void *object, void *from) {
   if (object) {
     pni_head(object)->refcount++;
   }
   return object;
 }
 
-void pn_decref(void *object)
+void pn_decref(void *object) {
+  pn_decref2(object, NULL);
+}
+
+void pn_decref2(void *object, void *from)
 {
   if (object) {
     pni_head_t *head = pni_head(object);
@@ -70,7 +82,11 @@ void pn_decref(void *object)
     head->refcount--;
     if (!head->refcount) {
       pn_finalize(object);
-      free(head);
+      // Check the refcount again in case finalize created a new
+      // reference.
+      if (!head->refcount) {
+        free(head);
+      }
     }
   }
 }
@@ -83,7 +99,6 @@ void pn_finalize(void *object)
     if (head->clazz && head->clazz->finalize) {
       head->clazz->finalize(object);
     }
-    head->refcount = 0;
   }
 }
 
@@ -101,7 +116,7 @@ void pn_free(void *object)
   }
 }
 
-pn_class_t *pn_class(void *object)
+const pn_class_t *pn_class(void *object)
 {
   assert(object);
   return pni_head(object)->clazz;
@@ -127,7 +142,7 @@ intptr_t pn_compare(void *a, void *b)
     pni_head_t *hb = pni_head(b);
 
     if (ha->clazz && hb->clazz && ha->clazz == hb->clazz) {
-      pn_class_t *clazz = ha->clazz;
+      const pn_class_t *clazz = ha->clazz;
       if (clazz->compare) {
         return clazz->compare(a, b);
       }
@@ -150,13 +165,20 @@ int pn_inspect(void *object, pn_string_t *dst)
 
   if (object) {
     pni_head_t *head = pni_head(object);
+    const char *name;
     if (head->clazz) {
-      pn_class_t *clazz = head->clazz;
+      const pn_class_t *clazz = head->clazz;
       if (clazz->inspect) {
         return clazz->inspect(object, dst);
+      } else if (clazz->name) {
+        name = clazz->name;
+      } else {
+        name = "object";
       }
+    } else {
+      name = "object";
     }
-    return pn_string_addf(dst, "object<%p>", object);
+    return pn_string_addf(dst, "%s<%p>", name, object);
   } else {
     return pn_string_addf(dst, "(null)");
   }
@@ -185,9 +207,9 @@ void pn_list_set(pn_list_t *list, int index, void *value)
 {
   assert(list); assert(list->size);
   void *old = list->elements[index % list->size];
-  if (list->options & PN_REFCOUNT) pn_decref(old);
+  if (list->options & PN_REFCOUNT) pn_decref2(old, list);
   list->elements[index % list->size] = value;
-  if (list->options & PN_REFCOUNT) pn_incref(value);
+  if (list->options & PN_REFCOUNT) pn_incref2(value, list);
 }
 
 void pn_list_ensure(pn_list_t *list, size_t capacity)
@@ -207,7 +229,7 @@ int pn_list_add(pn_list_t *list, void *value)
   assert(list);
   pn_list_ensure(list, list->size + 1);
   list->elements[list->size++] = value;
-  if (list->options & PN_REFCOUNT) pn_incref(value);
+  if (list->options & PN_REFCOUNT) pn_incref2(value, list);
   return 0;
 }
 
@@ -242,7 +264,7 @@ void pn_list_del(pn_list_t *list, int index, int n)
 
   if (list->options & PN_REFCOUNT) {
     for (int i = 0; i < n; i++) {
-      pn_decref(list->elements[index + i]);
+      pn_decref2(list->elements[index + i], list);
     }
   }
 
@@ -294,7 +316,7 @@ static void pn_list_finalize(void *object)
   assert(object);
   pn_list_t *list = (pn_list_t *) object;
   for (size_t i = 0; i < list->size; i++) {
-    if (list->options & PN_REFCOUNT) pn_decref(pn_list_get(list, i));
+    if (list->options & PN_REFCOUNT) pn_decref2(pn_list_get(list, i), list);
   }
   free(list->elements);
 }
@@ -354,7 +376,7 @@ static int pn_list_inspect(void *obj, pn_string_t *dst)
 
 pn_list_t *pn_list(size_t capacity, int options)
 {
-  static pn_class_t clazz = PN_CLASS(pn_list);
+  static const pn_class_t clazz = PN_CLASS(pn_list);
 
   pn_list_t *list = (pn_list_t *) pn_new(sizeof(pn_list_t), &clazz);
   list->capacity = capacity ? capacity : 16;
@@ -380,11 +402,12 @@ struct pn_map_t {
   size_t capacity;
   size_t addressable;
   size_t size;
-  float load_factor;
   uintptr_t (*hashcode)(void *key);
   bool (*equals)(void *a, void *b);
+  float load_factor;
   bool count_keys;
   bool count_values;
+  bool inspect_keys;
 };
 
 static void pn_map_finalize(void *object)
@@ -394,8 +417,8 @@ static void pn_map_finalize(void *object)
   if (map->count_keys || map->count_values) {
     for (size_t i = 0; i < map->capacity; i++) {
       if (map->entries[i].state != PNI_ENTRY_FREE) {
-        if (map->count_keys) pn_decref(map->entries[i].key);
-        if (map->count_values) pn_decref(map->entries[i].value);
+        if (map->count_keys) pn_decref2(map->entries[i].key, map);
+        if (map->count_values) pn_decref2(map->entries[i].value, map);
       }
     }
   }
@@ -447,7 +470,11 @@ static int pn_map_inspect(void *obj, pn_string_t *dst)
       err = pn_string_addf(dst, ", ");
       if (err) return err;
     }
-    err = pn_inspect(pn_map_key(map, entry), dst);
+    if (map->inspect_keys) {
+      err = pn_inspect(pn_map_key(map, entry), dst);
+    } else {
+      err = pn_string_addf(dst, "%p", pn_map_key(map, entry));
+    }
     if (err) return err;
     err = pn_string_addf(dst, ": ");
     if (err) return err;
@@ -463,7 +490,7 @@ static int pn_map_inspect(void *obj, pn_string_t *dst)
 
 pn_map_t *pn_map(size_t capacity, float load_factor, int options)
 {
-  static pn_class_t clazz = PN_CLASS(pn_map);
+  static const pn_class_t clazz = PN_CLASS(pn_map);
 
   pn_map_t *map = (pn_map_t *) pn_new(sizeof(pn_map_t), &clazz);
   map->capacity = capacity ? capacity : 16;
@@ -474,6 +501,7 @@ pn_map_t *pn_map(size_t capacity, float load_factor, int 
options)
   map->equals = pn_equals;
   map->count_keys = (options & PN_REFCOUNT) || (options & PN_REFCOUNT_KEY);
   map->count_values = (options & PN_REFCOUNT) || (options & PN_REFCOUNT_VALUE);
+  map->inspect_keys = true;
   pni_map_allocate(map);
   return map;
 }
@@ -492,7 +520,7 @@ static float pni_map_load(pn_map_t *map)
 static bool pni_map_ensure(pn_map_t *map, size_t capacity)
 {
   float load = pni_map_load(map);
-  if (capacity <= map->capacity && load < map->load_factor) {
+  if (capacity <= map->capacity && load <= map->load_factor) {
     return false;
   }
 
@@ -511,8 +539,8 @@ static bool pni_map_ensure(pn_map_t *map, size_t capacity)
       void *key = entries[i].key;
       void *value = entries[i].value;
       pn_map_put(map, key, value);
-      if (map->count_keys) pn_decref(key);
-      if (map->count_values) pn_decref(value);
+      if (map->count_keys) pn_decref2(key, map);
+      if (map->count_values) pn_decref2(value, map);
     }
   }
 
@@ -531,7 +559,7 @@ static pni_entry_t *pni_map_entry(pn_map_t *map, void *key, 
pni_entry_t **pprev,
     if (create) {
       entry->state = PNI_ENTRY_TAIL;
       entry->key = key;
-      if (map->count_keys) pn_incref(key);
+      if (map->count_keys) pn_incref2(key, map);
       map->size++;
       return entry;
     } else {
@@ -571,7 +599,7 @@ static pni_entry_t *pni_map_entry(pn_map_t *map, void *key, 
pni_entry_t **pprev,
     entry->state = PNI_ENTRY_LINK;
     map->entries[empty].state = PNI_ENTRY_TAIL;
     map->entries[empty].key = key;
-    if (map->count_keys) pn_incref(key);
+    if (map->count_keys) pn_incref2(key, map);
     if (pprev) *pprev = entry;
     map->size++;
     return &map->entries[empty];
@@ -584,9 +612,9 @@ int pn_map_put(pn_map_t *map, void *key, void *value)
 {
   assert(map);
   pni_entry_t *entry = pni_map_entry(map, key, NULL, true);
-  if (map->count_values) pn_decref(entry->value);
+  if (map->count_values) pn_decref2(entry->value, map);
   entry->value = value;
-  if (map->count_values) pn_incref(value);
+  if (map->count_values) pn_incref2(value, map);
   return 0;
 }
 
@@ -603,17 +631,24 @@ void pn_map_del(pn_map_t *map, void *key)
   pni_entry_t *prev = NULL;
   pni_entry_t *entry = pni_map_entry(map, key, &prev, false);
   if (entry) {
+    void *dref_key = (map->count_keys) ? entry->key : NULL;
+    void *dref_value = (map->count_values) ? entry->value : NULL;
     if (prev) {
       prev->next = entry->next;
       prev->state = entry->state;
+    } else if (entry->next) {
+      assert(entry->state == PNI_ENTRY_LINK);
+      pni_entry_t *next = &map->entries[entry->next];
+      *entry = *next;
+      entry = next;
     }
     entry->state = PNI_ENTRY_FREE;
     entry->next = 0;
-    if (map->count_keys) pn_decref(entry->key);
     entry->key = NULL;
-    if (map->count_values) pn_decref(entry->value);
     entry->value = NULL;
     map->size--;
+    if (dref_key) pn_decref2(dref_key, map);
+    if (dref_value) pn_decref2(dref_value, map);
   }
 }
 
@@ -676,6 +711,7 @@ pn_hash_t *pn_hash(size_t capacity, float load_factor, int 
options)
   hash->map.equals = pni_identity_equals;
   hash->map.count_keys = false;
   hash->map.count_values = options & PN_REFCOUNT;
+  hash->map.inspect_keys = false;
   return hash;
 }
 
@@ -796,7 +832,7 @@ pn_string_t *pn_string(const char *bytes)
 
 pn_string_t *pn_stringn(const char *bytes, size_t n)
 {
-  static pn_class_t clazz = PN_CLASS(pn_string);
+  static const pn_class_t clazz = PN_CLASS(pn_string);
   pn_string_t *string = (pn_string_t *) pn_new(sizeof(pn_string_t), &clazz);
   string->capacity = n ? n * sizeof(char) : 16;
   string->bytes = (char *) malloc(string->capacity);
@@ -985,7 +1021,7 @@ static void pn_iterator_finalize(void *object)
 
 pn_iterator_t *pn_iterator()
 {
-  static pn_class_t clazz = PN_CLASS(pn_iterator);
+  static const pn_class_t clazz = PN_CLASS(pn_iterator);
   pn_iterator_t *it = (pn_iterator_t *) pn_new(sizeof(pn_iterator_t), &clazz);
   return it;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/parser.c
----------------------------------------------------------------------
diff --git a/proton-c/src/parser.c b/proton-c/src/parser.c
index 6766607..fccba75 100644
--- a/proton-c/src/parser.c
+++ b/proton-c/src/parser.c
@@ -29,10 +29,10 @@
 
 struct pn_parser_t {
   pn_scanner_t *scanner;
-  int error_code;
   char *atoms;
   size_t size;
   size_t capacity;
+  int error_code;
 };
 
 pn_parser_t *pn_parser()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/platform.h
----------------------------------------------------------------------
diff --git a/proton-c/src/platform.h b/proton-c/src/platform.h
index 6b63e2e..b0475e0 100644
--- a/proton-c/src/platform.h
+++ b/proton-c/src/platform.h
@@ -85,8 +85,10 @@ int pn_i_vsnprintf(char *buf, size_t count, const char *fmt, 
va_list ap);
 #endif
 
 #if defined _MSC_VER || defined _OPENVMS
+#if !defined(va_copy)
 #define va_copy(d,s) ((d) = (s))
 #endif
+#endif
 
 #ifdef __cplusplus
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/posix/driver.c b/proton-c/src/posix/driver.c
index 0ce8f9f..b128cb2 100644
--- a/proton-c/src/posix/driver.c
+++ b/proton-c/src/posix/driver.c
@@ -64,19 +64,19 @@ struct pn_driver_t {
   struct pollfd *fds;
   size_t nfds;
   int ctrl[2]; //pipe for updating selectable status
-  pn_trace_t trace;
   pn_timestamp_t wakeup;
+  pn_trace_t trace;
 };
 
 struct pn_listener_t {
   pn_driver_t *driver;
   pn_listener_t *listener_next;
   pn_listener_t *listener_prev;
+  void *context;
   int idx;
-  bool pending;
   int fd;
+  bool pending;
   bool closed;
-  void *context;
 };
 
 #define PN_NAME_MAX (256)
@@ -86,22 +86,22 @@ struct pn_connector_t {
   pn_connector_t *connector_next;
   pn_connector_t *connector_prev;
   char name[PN_NAME_MAX];
+  pn_timestamp_t wakeup;
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  pn_sasl_t *sasl;
+  pn_listener_t *listener;
+  void *context;
   int idx;
-  bool pending_tick;
-  bool pending_read;
-  bool pending_write;
   int fd;
   int status;
   pn_trace_t trace;
+  bool pending_tick;
+  bool pending_read;
+  bool pending_write;
   bool closed;
-  pn_timestamp_t wakeup;
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  pn_sasl_t *sasl;
   bool input_done;
   bool output_done;
-  pn_listener_t *listener;
-  void *context;
 };
 
 /* Impls */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/io.c
----------------------------------------------------------------------
diff --git a/proton-c/src/posix/io.c b/proton-c/src/posix/io.c
index 11379ff..4fa223f 100644
--- a/proton-c/src/posix/io.c
+++ b/proton-c/src/posix/io.c
@@ -21,6 +21,7 @@
 
 #include <proton/io.h>
 #include <proton/object.h>
+#include <proton/selector.h>
 
 #include <ctype.h>
 #include <errno.h>
@@ -43,6 +44,7 @@ struct pn_io_t {
   char host[MAX_HOST];
   char serv[MAX_SERV];
   pn_error_t *error;
+  pn_selector_t *selector;
   bool wouldblock;
 };
 
@@ -51,6 +53,7 @@ void pn_io_initialize(void *obj)
   pn_io_t *io = (pn_io_t *) obj;
   io->error = pn_error();
   io->wouldblock = false;
+  io->selector = NULL;
 }
 
 void pn_io_finalize(void *obj)
@@ -65,7 +68,7 @@ void pn_io_finalize(void *obj)
 
 pn_io_t *pn_io(void)
 {
-  static pn_class_t clazz = PN_CLASS(pn_io);
+  static const pn_class_t clazz = PN_CLASS(pn_io);
   pn_io_t *io = (pn_io_t *) pn_new(sizeof(pn_io_t), &clazz);
   return io;
 }
@@ -275,3 +278,10 @@ bool pn_wouldblock(pn_io_t *io)
 {
   return io->wouldblock;
 }
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+  if (io->selector == NULL)
+    io->selector = pni_selector();
+  return io->selector;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/posix/selector.c b/proton-c/src/posix/selector.c
index 1c42d99..14a97ee 100644
--- a/proton-c/src/posix/selector.c
+++ b/proton-c/src/posix/selector.c
@@ -65,9 +65,9 @@ void pn_selector_finalize(void *obj)
 #define pn_selector_compare NULL
 #define pn_selector_inspect NULL
 
-pn_selector_t *pn_selector(void)
+pn_selector_t *pni_selector(void)
 {
-  static pn_class_t clazz = PN_CLASS(pn_selector);
+  static const pn_class_t clazz = PN_CLASS(pn_selector);
   pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), 
&clazz);
   return selector;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/protocol.h.py
----------------------------------------------------------------------
diff --git a/proton-c/src/protocol.h.py b/proton-c/src/protocol.h.py
index 0ff7aa4..76a2391 100644
--- a/proton-c/src/protocol.h.py
+++ b/proton-c/src/protocol.h.py
@@ -24,6 +24,7 @@ print "/* generated */"
 print "#ifndef _PROTON_PROTOCOL_H"
 print "#define _PROTON_PROTOCOL_H 1"
 print
+print "#include \"proton/type_compat.h\""
 
 fields = {}
 
@@ -45,36 +46,99 @@ for type in TYPES:
   fields[code] = (type["@name"], [f["@name"] for f in type.query["field"]])
   idx += 1
 
-print
+print """
+#include <stddef.h>
+
+typedef struct {
+  const unsigned char name_index;
+  const unsigned char first_field_index;
+  const unsigned char field_count;
+} pn_fields_t;
 
-print """typedef struct {
-  const char *name;
-  const char *fields[32];
-} pn_fields_t;"""
+extern const pn_fields_t FIELDS[];
+extern const char * const FIELD_STRINGPOOL;
+extern const uint16_t FIELD_NAME[];
+extern const uint16_t FIELD_FIELDS[];
+extern const unsigned char FIELD_MIN;
+extern const unsigned char FIELD_MAX;
+"""
 
+print "#ifdef DEFINE_FIELDS"
+
+print 'struct FIELD_STRINGS {'
+print '  const char FIELD_STRINGS_NULL[sizeof("")];'
+strings = set()
+for name, fnames in fields.values():
+    strings.add(name)
+    strings.update(fnames)
+for str in strings:
+    istr = str.replace("-", "_");
+    print '  const char FIELD_STRINGS_%s[sizeof("%s")];' % (istr, str)
+print "};"
+print
+print 'const struct FIELD_STRINGS FIELD_STRINGS = {'
+print '  "",'
+for str in strings:
+    print '  "%s",'% str
+print "};"
+print 'const char * const FIELD_STRINGPOOL = (const char * const) 
&FIELD_STRINGS;'
 print
+print "/* This is an array of offsets into FIELD_STRINGPOOL */"
+print "const uint16_t FIELD_NAME[] = {"
+print "  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_NULL),"
+index = 1
+for i in range(256):
+  if i in fields:
+    name, fnames = fields[i]
+    iname = name.replace("-", "_");
+    print '  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d */' % 
(iname, index)
+    index += 1
+print "};"
 
-print "#ifndef DEFINE_FIELDS"
-print "extern"
-print "#endif"
+print "/* This is an array of offsets into FIELD_STRINGPOOL */"
+print "const uint16_t FIELD_FIELDS[] = {"
+print "  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_NULL),"
+index = 1
+for i in range(256):
+  if i in fields:
+    name, fnames = fields[i]
+    if fnames:
+      for f in fnames:
+        ifname = f.replace("-", "_");
+        print '  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d (%s) 
*/' % (ifname, index, name)
+        index += 1
+print "};"
 
-print "pn_fields_t FIELDS[256]"
-print "#ifdef DEFINE_FIELDS"
-print " = {"
+print "const pn_fields_t FIELDS[] = {"
 
+name_count = 1
+field_count = 1
+field_min = 256
+field_max = 0
 for i in range(256):
   if i in fields:
+    if i>field_max: field_max = i
+    if i<field_min: field_min = i
+
+for i in range(field_min, field_max+1):
+  if i in fields:
     name, fnames = fields[i]
     if fnames:
-      print '  {"%s", {%s}},' % (name, ", ".join(['"%s"' % f for f in fnames]))
+      print '  {%d, %d, %d}, /* %d (%s) */' % (name_count, field_count, 
len(fnames), i, name)
+      field_count += len(fnames)
     else:
-      print '  {"%s", {NULL}},' % name
+      print '  {%d, 0, 0}, /* %d (%s) */' % (name_count, i, name)
+    name_count += 1
+    if i>field_max: field_max = i
+    if i<field_min: field_min = i
   else:
-    print '  {NULL, {NULL}},'
+    print '  {0, 0, 0}, /* %d */' % i
 
-print "}"
+print "};"
+print
+print 'const unsigned char FIELD_MIN = %d;' % field_min
+print 'const unsigned char FIELD_MAX = %d;' % field_max
+print
 print "#endif"
-print ";"
-
 print
 print "#endif /* protocol.h */"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/proton-dump.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proton-dump.c b/proton-c/src/proton-dump.c
index c7216c3..4e8a04f 100644
--- a/proton-c/src/proton-dump.c
+++ b/proton-c/src/proton-dump.c
@@ -19,6 +19,8 @@
  *
  */
 
+#include "pncompat/misc_funcs.inc"
+
 #include <stdio.h>
 #include <proton/buffer.h>
 #include <proton/codec.h>
@@ -98,8 +100,34 @@ int dump(const char *file)
   return 0;
 }
 
+void usage(char* prog) {
+  printf("Usage: %s [FILE1] [FILEn] ...\n", prog);
+  printf("Displays the content of an AMQP dump file containing frame data.\n");
+  printf("\n  [FILEn]  Dump file to be displayed.\n\n");
+}
+
 int main(int argc, char **argv)
 {
+  if(argc == 1) {
+    usage(argv[0]);
+    return 0;
+  }
+
+  int c;
+
+  while ( (c = getopt(argc, argv, "h")) != -1 ) {
+    switch(c) {
+    case 'h':
+      usage(argv[0]);
+      return 0;
+      break;
+
+    case '?':
+      usage(argv[0]);
+      return 1;
+    }
+  }
+
   for (int i = 1; i < argc; i++) {
     int err = dump(argv[i]);
     if (err) return err;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/proton.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proton.c b/proton-c/src/proton.c
index 9cd44ef..2b7d313 100644
--- a/proton-c/src/proton.c
+++ b/proton-c/src/proton.c
@@ -22,12 +22,12 @@
 #if defined(_WIN32) && ! defined(__CYGWIN__)
 #define NOGDI
 #include <winsock2.h>
-#include "pncompat/misc_funcs.inc"
 #else
 #include <unistd.h>
-#include <libgen.h>
 #endif
 
+#include "pncompat/misc_funcs.inc"
+
 #include <stdio.h>
 #include <string.h>
 #include <proton/driver.h>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index 3f4f536..9c7ba1e 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -19,6 +19,7 @@
  *
  */
 
+#include <assert.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -27,29 +28,28 @@
 #include <proton/engine.h> // XXX: just needed for PN_EOS
 #include <proton/sasl.h>
 #include "protocol.h"
+#include "dispatch_actions.h"
 #include "../dispatcher/dispatcher.h"
 #include "../engine/engine-internal.h"
 #include "../util.h"
 
-#define SCRATCH (1024)
 
 struct pn_sasl_t {
   pn_transport_t *transport;
   pn_io_layer_t *io_layer;
-  size_t header_count;
   pn_dispatcher_t *disp;
-  bool client;
-  bool configured;
   char *mechanisms;
   char *remote_mechanisms;
   pn_buffer_t *send_data;
   pn_buffer_t *recv_data;
   pn_sasl_outcome_t outcome;
+  bool client;
+  bool configured;
+  bool allow_skip;
   bool sent_init;
   bool rcvd_init;
   bool sent_done;
   bool rcvd_done;
-  char scratch[SCRATCH];
 };
 
 static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char 
*bytes, size_t available);
@@ -57,12 +57,6 @@ static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, 
const char *bytes, si
 static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char 
*bytes, size_t available);
 static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, 
size_t available);
 
-int pn_do_init(pn_dispatcher_t *disp);
-int pn_do_mechanisms(pn_dispatcher_t *disp);
-int pn_do_challenge(pn_dispatcher_t *disp);
-int pn_do_response(pn_dispatcher_t *disp);
-int pn_do_outcome(pn_dispatcher_t *disp);
-
 pn_sasl_t *pn_sasl(pn_transport_t *transport)
 {
   if (!transport->sasl) {
@@ -70,12 +64,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
     sasl->disp = pn_dispatcher(1, transport);
     sasl->disp->batch = false;
 
-    pn_dispatcher_action(sasl->disp, SASL_INIT, pn_do_init);
-    pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, pn_do_mechanisms);
-    pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, pn_do_challenge);
-    pn_dispatcher_action(sasl->disp, SASL_RESPONSE, pn_do_response);
-    pn_dispatcher_action(sasl->disp, SASL_OUTCOME, pn_do_outcome);
-
     sasl->client = false;
     sasl->configured = false;
     sasl->mechanisms = NULL;
@@ -83,6 +71,7 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
     sasl->send_data = pn_buffer(16);
     sasl->recv_data = pn_buffer(16);
     sasl->outcome = PN_SASL_NONE;
+    sasl->allow_skip = false;
     sasl->sent_init = false;
     sasl->rcvd_init = false;
     sasl->sent_done = false;
@@ -95,8 +84,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
     sasl->io_layer->process_input = pn_input_read_sasl_header;
     sasl->io_layer->process_output = pn_output_write_sasl_header;
     sasl->io_layer->process_tick = pn_io_layer_tick_passthru;
-
-    sasl->header_count = 0;
   }
 
   return transport->sasl;
@@ -185,6 +172,12 @@ void pn_sasl_server(pn_sasl_t *sasl)
   }
 }
 
+void pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow)
+{
+  if (sasl)
+    sasl->allow_skip = allow;
+}
+
 void pn_sasl_plain(pn_sasl_t *sasl, const char *username, const char *password)
 {
   if (!sasl) return;
@@ -238,7 +231,7 @@ void pn_sasl_free(pn_sasl_t *sasl)
 
 void pn_client_init(pn_sasl_t *sasl)
 {
-  pn_bytes_t bytes = pn_buffer_bytes(sasl->send_data);
+  pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data);
   pn_post_frame(sasl->disp, 0, "DL[sz]", SASL_INIT, sasl->mechanisms,
                 bytes.size, bytes.start);
   pn_buffer_clear(sasl->send_data);
@@ -294,7 +287,7 @@ void pn_sasl_process(pn_sasl_t *sasl)
   }
 
   if (pn_buffer_size(sasl->send_data)) {
-    pn_bytes_t bytes = pn_buffer_bytes(sasl->send_data);
+    pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data);
     pn_post_frame(sasl->disp, 0, "DL[z]", sasl->client ? SASL_RESPONSE : 
SASL_CHALLENGE,
                   bytes.size, bytes.start);
     pn_buffer_clear(sasl->send_data);
@@ -408,27 +401,44 @@ int pn_do_outcome(pn_dispatcher_t *disp)
 }
 
 #define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
 #define SASL_HEADER_LEN 8
 
 static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char 
*bytes, size_t available)
 {
   pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
-  const char *point = SASL_HEADER + sasl->header_count;
-  int delta = pn_min(available, SASL_HEADER_LEN - sasl->header_count);
-  if (!available || memcmp(bytes, point, delta)) {
-    char quoted[1024];
-    pn_quote_data(quoted, 1024, bytes, available);
-    return pn_error_format(sasl->transport->error, PN_ERR,
-                           "%s header mismatch: '%s'", "SASL", quoted);
-  } else {
-    sasl->header_count += delta;
-    if (sasl->header_count == SASL_HEADER_LEN) {
-      sasl->io_layer->process_input = pn_input_read_sasl;
-      if (sasl->disp->trace & PN_TRACE_FRM)
-        pn_transport_logf(sasl->transport, "  <- %s", "SASL");
+  if (available > 0) {
+    if (available < SASL_HEADER_LEN) {
+      if (memcmp(bytes, SASL_HEADER, available) == 0 ||
+          memcmp(bytes, AMQP_HEADER, available) == 0)
+        return 0;
+    } else {
+      if (memcmp(bytes, SASL_HEADER, SASL_HEADER_LEN) == 0) {
+        sasl->io_layer->process_input = pn_input_read_sasl;
+        if (sasl->disp->trace & PN_TRACE_FRM)
+          pn_transport_logf(sasl->transport, "  <- %s", "SASL");
+        return SASL_HEADER_LEN;
+      }
+      if (memcmp(bytes, AMQP_HEADER, SASL_HEADER_LEN) == 0) {
+        if (sasl->allow_skip) {
+          sasl->outcome = PN_SASL_SKIPPED;
+          sasl->io_layer->process_input = pn_io_layer_input_passthru;
+          sasl->io_layer->process_output = pn_io_layer_output_passthru;
+          pn_io_layer_t *io_next = sasl->io_layer->next;
+          return io_next->process_input( io_next, bytes, available );
+        } else {
+            pn_do_error(sasl->transport, "amqp:connection:policy-error",
+                        "Client skipped SASL exchange - forbidden");
+            return PN_EOS;
+        }
+      }
     }
-    return delta;
   }
+  char quoted[1024];
+  pn_quote_data(quoted, 1024, bytes, available);
+  pn_do_error(sasl->transport, "amqp:connection:framing-error",
+              "%s header mismatch: '%s'", "SASL", quoted);
+  return PN_EOS;
 }
 
 static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, 
size_t available)
@@ -448,13 +458,10 @@ static ssize_t pn_output_write_sasl_header(pn_io_layer_t 
*io_layer, char *bytes,
   pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
   if (sasl->disp->trace & PN_TRACE_FRM)
     pn_transport_logf(sasl->transport, "  -> %s", "SASL");
-  if (size >= SASL_HEADER_LEN) {
-    memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
-    sasl->io_layer->process_output = pn_output_write_sasl;
-    return SASL_HEADER_LEN;
-  } else {
-    return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow 
writing %s header", "SASL");
-  }
+  assert(size >= SASL_HEADER_LEN);
+  memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
+  sasl->io_layer->process_output = pn_output_write_sasl;
+  return SASL_HEADER_LEN;
 }
 
 static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, 
size_t size)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c
index 97283b4..1f39a38 100644
--- a/proton-c/src/selectable.c
+++ b/proton-c/src/selectable.c
@@ -90,7 +90,7 @@ pn_selectable_t *pni_selectable(ssize_t 
(*capacity)(pn_selectable_t *),
                                 void (*expired)(pn_selectable_t *),
                                 void (*finalize)(pn_selectable_t *))
 {
-  static pn_class_t clazz = PN_CLASS(pn_selectable);
+  static const pn_class_t clazz = PN_CLASS(pn_selectable);
   pn_selectable_t *selectable = (pn_selectable_t *) 
pn_new(sizeof(pn_selectable_t), &clazz);
   selectable->capacity = capacity;
   selectable->pending = pending;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/ssl/openssl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c
index 5815845..db3af4c 100644
--- a/proton-c/src/ssl/openssl.c
+++ b/proton-c/src/ssl/openssl.c
@@ -49,23 +49,25 @@ typedef enum { UNKNOWN_CONNECTION, SSL_CONNECTION, 
CLEAR_CONNECTION } connection
 typedef struct pn_ssl_session_t pn_ssl_session_t;
 
 struct pn_ssl_domain_t {
-  int   ref_count;
 
   SSL_CTX       *ctx;
-  pn_ssl_mode_t mode;
 
-  bool has_ca_db;       // true when CA database configured
-  bool has_certificate; // true when certificate configured
   char *keyfile_pw;
 
   // settings used for all connections
   char *trusted_CAs;
-  pn_ssl_verify_mode_t verify_mode;
-  bool allow_unsecured;
 
   // session cache
   pn_ssl_session_t *ssn_cache_head;
   pn_ssl_session_t *ssn_cache_tail;
+
+  int   ref_count;
+  pn_ssl_mode_t mode;
+  pn_ssl_verify_mode_t verify_mode;
+
+  bool has_ca_db;       // true when CA database configured
+  bool has_certificate; // true when certificate configured
+  bool allow_unsecured;
 };
 
 
@@ -81,24 +83,25 @@ struct pn_ssl_t {
   BIO *bio_ssl;         // i/o from/to SSL socket layer
   BIO *bio_ssl_io;      // SSL "half" of network-facing BIO
   BIO *bio_net_io;      // socket-side "half" of network-facing BIO
-  bool ssl_shutdown;    // BIO_ssl_shutdown() called on socket.
-  bool ssl_closed;      // shutdown complete, or SSL error
-  ssize_t app_input_closed;   // error code returned by upper layer process 
input
-  ssize_t app_output_closed;  // error code returned by upper layer process 
output
-
-  bool read_blocked;    // SSL blocked until more network data is read
-  bool write_blocked;   // SSL blocked until data is written to network
-
   // buffers for holding I/O from "applications" above SSL
 #define APP_BUF_SIZE    (4*1024)
   char *outbuf;
+  char *inbuf;
+
+  ssize_t app_input_closed;   // error code returned by upper layer process 
input
+  ssize_t app_output_closed;  // error code returned by upper layer process 
output
+
   size_t out_size;
   size_t out_count;
-  char *inbuf;
   size_t in_size;
   size_t in_count;
 
   pn_trace_t trace;
+
+  bool ssl_shutdown;    // BIO_ssl_shutdown() called on socket.
+  bool ssl_closed;      // shutdown complete, or SSL error
+  bool read_blocked;    // SSL blocked until more network data is read
+  bool write_blocked;   // SSL blocked until data is written to network
 };
 
 struct pn_ssl_session_t {
@@ -184,7 +187,7 @@ static int ssl_failed(pn_ssl_t *ssl)
 {
     SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
   ssl->ssl_closed = true;
-  ssl->app_input_closed = ssl->app_output_closed = PN_ERR;
+  ssl->app_input_closed = ssl->app_output_closed = PN_EOS;
   // fake a shutdown so the i/o processing code will close properly
   SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
   // try to grab the first SSL error to add to the failure log
@@ -195,7 +198,8 @@ static int ssl_failed(pn_ssl_t *ssl)
   }
   _log_ssl_error(NULL);    // spit out any remaining errors to the log file
   ssl->transport->tail_closed = true;
-  return pn_error_format( ssl->transport->error, PN_ERR, "SSL Failure: %s", 
buf );
+  pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: 
%s", buf);
+  return PN_EOS;
 }
 
 /* match the DNS name pattern from the peer certificate against our configured 
peer
@@ -343,7 +347,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX 
*ctx)
 // "openssl dhparam -C -2 2048"
 static DH *get_dh2048(void)
 {
-  static unsigned char dh2048_p[]={
+  static const unsigned char dh2048_p[]={
     0xAE,0xF7,0xE9,0x66,0x26,0x7A,0xAC,0x0A,0x6F,0x1E,0xCD,0x81,
     0xBD,0x0A,0x10,0x7E,0xFA,0x2C,0xF5,0x2D,0x98,0xD4,0xE7,0xD9,
     0xE4,0x04,0x8B,0x06,0x85,0xF2,0x0B,0xA3,0x90,0x15,0x56,0x0C,
@@ -367,7 +371,7 @@ static DH *get_dh2048(void)
     0xA4,0xED,0xFD,0x49,0x0B,0xE3,0x4A,0xF6,0x28,0xB3,0x98,0xB0,
     0x23,0x1C,0x09,0x33,
   };
-  static unsigned char dh2048_g[]={
+  static const unsigned char dh2048_g[]={
     0x02,
   };
   DH *dh;
@@ -807,7 +811,7 @@ static int setup_ssl_connection( pn_ssl_t *ssl )
 static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char 
*input_data, size_t available)
 {
   pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
   _log( ssl, "process_input_ssl( data size=%d )\n",available );
 
@@ -904,14 +908,13 @@ static ssize_t process_input_ssl( pn_io_layer_t 
*io_layer, const char *input_dat
             if (!max_frame) max_frame = ssl->in_size * 2;  // no limit
             if (ssl->in_size < max_frame) {
               // no max frame limit - grow it.
-              char *newbuf = (char *)malloc( max_frame );
+              size_t newsize = pn_min(max_frame, ssl->in_size * 2);
+              char *newbuf = (char *)realloc( ssl->inbuf, newsize );
               if (newbuf) {
-                ssl->in_size = max_frame;
-                memmove( newbuf, ssl->inbuf, ssl->in_count );
-                free( ssl->inbuf );
+                ssl->in_size = newsize;
                 ssl->inbuf = newbuf;
+                work_pending = true;  // can we get more input?
               }
-              work_pending = true;  // can we get more input?
             } else {
               // can't gather any more input, but app needs more?
               // This is a bug - since SSL can buffer up to max-frame,
@@ -951,8 +954,8 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, 
const char *input_dat
 static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, 
size_t max_len)
 {
   pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (!ssl) return PN_ERR;
-  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+  if (!ssl) return PN_EOS;
+  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
   ssize_t written = 0;
   bool work_pending;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/tests/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/object.c b/proton-c/src/tests/object.c
index 228370f..f6d11cf 100644
--- a/proton-c/src/tests/object.c
+++ b/proton-c/src/tests/object.c
@@ -95,7 +95,7 @@ static intptr_t delta(void *a, void *b) { return (uintptr_t) 
b - (uintptr_t) a;
 
 static pn_class_t null_class = {0};
 
-static pn_class_t noop_class = {noop, noop, zero, delta};
+static pn_class_t noop_class = {NULL, noop, noop, zero, delta};
 
 static void test_new(size_t size, pn_class_t *clazz)
 {
@@ -119,7 +119,7 @@ static void finalizer(void *object)
 
 static void test_finalize(void)
 {
-  static pn_class_t clazz = {NULL, finalizer};
+  static pn_class_t clazz = {NULL, NULL, finalizer};
 
   int **obj = (int **) pn_new(sizeof(int **), &clazz);
   assert(obj);
@@ -141,7 +141,7 @@ static uintptr_t hashcode(void *obj) { return (uintptr_t) 
obj; }
 
 static void test_hashcode(void)
 {
-  static pn_class_t clazz = {NULL, NULL, hashcode};
+  static pn_class_t clazz = {NULL, NULL, NULL, hashcode};
   void *obj = pn_new(0, &clazz);
   assert(obj);
   assert(pn_hashcode(obj) == (uintptr_t) obj);
@@ -151,7 +151,7 @@ static void test_hashcode(void)
 
 static void test_compare(void)
 {
-  static pn_class_t clazz = {NULL, NULL, NULL, delta};
+  static pn_class_t clazz = {NULL, NULL, NULL, NULL, delta};
 
   void *a = pn_new(0, &clazz);
   assert(a);
@@ -499,6 +499,50 @@ static void test_hash(void)
   pn_decref(three);
 }
 
+
+// collider class: all objects have same hash, no two objects compare equal
+static intptr_t collider_compare(void *a, void *b)
+{
+  if (a == b) return 0;
+  return (a > b) ? 1 : -1;
+}
+
+static uintptr_t collider_hashcode(void *obj)
+{
+  return 23;
+}
+
+#define collider_initialize NULL
+#define collider_finalize NULL
+#define collider_inspect NULL
+
+static void test_map_links(void)
+{
+  const pn_class_t collider_clazz = PN_CLASS(collider);
+  void *keys[3];
+  for (int i = 0; i < 3; i++)
+    keys[i] = pn_new(0, &collider_clazz);
+
+  // test deleting a head, middle link, tail
+
+  for (int delete_idx=0; delete_idx < 3; delete_idx++) {
+    pn_map_t *map = pn_map(0, 0.75, 0);
+    // create a chain of entries that have same head (from identical key 
hashcode)
+    for (int i = 0; i < 3; i++) {
+      pn_map_put(map, keys[i], keys[i]);
+    }
+    pn_map_del(map, keys[delete_idx]);
+    for (int i = 0; i < 3; i++) {
+      void *value = (i == delete_idx) ? NULL : keys[i];
+      assert (pn_map_get(map, keys[i]) == value);
+    }
+    pn_free(map);
+  }
+  for (int i = 0; i < 3; i++)
+    pn_free(keys[i]);
+}
+
+
 static bool equals(const char *a, const char *b)
 {
   if (a == NULL && b == NULL) {
@@ -792,6 +836,7 @@ int main(int argc, char **argv)
   test_list_index();
 
   test_map();
+  test_map_links();
 
   test_hash();
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/tests/parse-url.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/parse-url.c b/proton-c/src/tests/parse-url.c
index 4489ab2..f57e739 100644
--- a/proton-c/src/tests/parse-url.c
+++ b/proton-c/src/tests/parse-url.c
@@ -64,6 +64,8 @@ static bool test_url_parse(const char* url0, const char* 
scheme0, const char* us
 
 int main(int argc, char **argv)
 {
+  assert(test_url_parse("", 0, 0, 0, "", 0, 0));
+  assert(test_url_parse("/Foo.bar:90087@somewhere", 0, 0, 0, "", 0, 
"Foo.bar:90087@somewhere"));
   assert(test_url_parse("host", 0, 0, 0, "host", 0, 0));
   assert(test_url_parse("host:423", 0, 0, 0, "host", "423", 0));
   assert(test_url_parse("user@host", 0, "user", 0, "host", 0, 0));
@@ -96,5 +98,10 @@ int main(int argc, char **argv)
   assert(test_url_parse("us%2fer:password@host", 0, "us/er", "password", 
"host", 0, 0));
   assert(test_url_parse("us%2Fer:password@host", 0, "us/er", "password", 
"host", 0, 0));
   assert(test_url_parse("user:pass%2fword%@host", 0, "user", "pass/word%", 
"host", 0, 0));
+  
assert(test_url_parse("localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0",
 0, 0, 0, "localhost", 0, 
"temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  
assert(test_url_parse("/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 
0, 0, 0, "", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  
assert(test_url_parse("amqp://localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0",
 "amqp", 0, 0, "localhost", 0, 
"temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  // Really perverse url
+  assert(test_url_parse("://:@://:", "", "", "", "", "", "/:"));
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c 
b/proton-c/src/transport/transport.c
index e5b4a31..1ac2876 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -24,6 +24,7 @@
 #include <string.h>
 #include <proton/framing.h>
 #include "protocol.h"
+#include "dispatch_actions.h"
 
 #include <assert.h>
 #include <stdarg.h>
@@ -42,7 +43,7 @@ static ssize_t transport_consume(pn_transport_t *transport);
 
 void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
 {
-  db->deliveries = pn_hash(1024, 0.75, PN_REFCOUNT);
+  db->deliveries = pn_hash(0, 0.75, PN_REFCOUNT);
   db->next = next;
 }
 
@@ -92,16 +93,6 @@ void pn_delivery_map_clear(pn_delivery_map_t *dm)
   }
 }
 
-int pn_do_open(pn_dispatcher_t *disp);
-int pn_do_begin(pn_dispatcher_t *disp);
-int pn_do_attach(pn_dispatcher_t *disp);
-int pn_do_transfer(pn_dispatcher_t *disp);
-int pn_do_flow(pn_dispatcher_t *disp);
-int pn_do_disposition(pn_dispatcher_t *disp);
-int pn_do_detach(pn_dispatcher_t *disp);
-int pn_do_end(pn_dispatcher_t *disp);
-int pn_do_close(pn_dispatcher_t *disp);
-
 static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char 
*bytes, size_t available);
 static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, 
size_t available);
 static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char 
*bytes, size_t available);
@@ -150,16 +141,6 @@ static void pn_transport_initialize(void *object)
   amqp->buffered_input = NULL;
   amqp->next = NULL;
 
-  pn_dispatcher_action(transport->disp, OPEN, pn_do_open);
-  pn_dispatcher_action(transport->disp, BEGIN, pn_do_begin);
-  pn_dispatcher_action(transport->disp, ATTACH, pn_do_attach);
-  pn_dispatcher_action(transport->disp, TRANSFER, pn_do_transfer);
-  pn_dispatcher_action(transport->disp, FLOW, pn_do_flow);
-  pn_dispatcher_action(transport->disp, DISPOSITION, pn_do_disposition);
-  pn_dispatcher_action(transport->disp, DETACH, pn_do_detach);
-  pn_dispatcher_action(transport->disp, END, pn_do_end);
-  pn_dispatcher_action(transport->disp, CLOSE, pn_do_close);
-
   transport->open_sent = false;
   transport->open_rcvd = false;
   transport->close_sent = false;
@@ -178,11 +159,10 @@ static void pn_transport_initialize(void *object)
   transport->remote_idle_timeout = 0;
   transport->keepalive_deadline = 0;
   transport->last_bytes_output = 0;
-  transport->remote_offered_capabilities = pn_data(16);
-  transport->remote_desired_capabilities = pn_data(16);
-  transport->remote_properties = pn_data(16);
-  transport->disp_data = pn_data(16);
-  transport->error = pn_error();
+  transport->remote_offered_capabilities = pn_data(0);
+  transport->remote_desired_capabilities = pn_data(0);
+  transport->remote_properties = pn_data(0);
+  transport->disp_data = pn_data(0);
   pn_condition_init(&transport->remote_condition);
 
   transport->local_channels = pn_hash(0, 0.75, PN_REFCOUNT);
@@ -193,6 +173,8 @@ static void pn_transport_initialize(void *object)
 
   transport->input_pending = 0;
   transport->output_pending = 0;
+
+  transport->done_processing = false;
 }
 
 pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -200,14 +182,20 @@ pn_session_t *pn_channel_state(pn_transport_t *transport, 
uint16_t channel)
   return (pn_session_t *) pn_hash_get(transport->remote_channels, channel);
 }
 
-static void pn_map_channel(pn_transport_t *transport, uint16_t channel, 
pn_session_t *session)
+static void pni_map_remote_channel(pn_session_t *session, uint16_t channel)
 {
+  pn_transport_t *transport = session->connection->transport;
   pn_hash_put(transport->remote_channels, channel, session);
   session->state.remote_channel = channel;
 }
 
-void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
+void pni_transport_unbind_handles(pn_hash_t *handles);
+
+static void pni_unmap_remote_channel(pn_session_t *ssn)
 {
+  // XXX: should really update link state also
+  pni_transport_unbind_handles(ssn->state.remote_handles);
+  pn_transport_t *transport = ssn->connection->transport;
   uint16_t channel = ssn->state.remote_channel;
   ssn->state.remote_channel = -2;
   // note: may free the session:
@@ -222,7 +210,7 @@ static void pn_transport_finalize(void *object);
 
 pn_transport_t *pn_transport()
 {
-  static pn_class_t clazz = PN_CLASS(pn_transport);
+  static const pn_class_t clazz = PN_CLASS(pn_transport);
   pn_transport_t *transport = (pn_transport_t *) pn_new(sizeof(pn_transport_t),
                                                         &clazz);
   if (!transport) return NULL;
@@ -265,7 +253,6 @@ static void pn_transport_finalize(void *object)
   pn_free(transport->remote_desired_capabilities);
   pn_free(transport->remote_properties);
   pn_free(transport->disp_data);
-  pn_error_free(transport->error);
   pn_condition_tini(&transport->remote_condition);
   pn_free(transport->local_channels);
   pn_free(transport->remote_channels);
@@ -281,22 +268,35 @@ int pn_transport_bind(pn_transport_t *transport, 
pn_connection_t *connection)
   if (connection->transport) return PN_STATE_ERR;
   transport->connection = connection;
   connection->transport = transport;
-  pn_incref(connection);
+  pn_incref2(connection, transport);
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
-    pn_event_t *event = pn_collector_put(connection->collector,
-                                         PN_CONNECTION_REMOTE_STATE);
-    if (event) {
-      pn_event_init_connection(event, connection);
-    }
-    if (!pn_error_code(transport->error)) {
-      transport->disp->halt = false;
-      transport_consume(transport);        // blech - testBindAfterOpen
-    }
+    pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_OPEN, 
connection);
+    transport->disp->halt = false;
+    transport_consume(transport);        // blech - testBindAfterOpen
   }
   return 0;
 }
 
+void pni_transport_unbind_handles(pn_hash_t *handles)
+{
+  for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) 
{
+    uintptr_t key = pn_hash_key(handles, h);
+    pn_hash_del(handles, key);
+  }
+}
+
+void pni_transport_unbind_channels(pn_hash_t *channels)
+{
+  for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, 
h)) {
+    uintptr_t key = pn_hash_key(channels, h);
+    pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h);
+    pni_transport_unbind_handles(ssn->state.local_handles);
+    pni_transport_unbind_handles(ssn->state.remote_handles);
+    pn_hash_del(channels, key);
+  }
+}
+
 int pn_transport_unbind(pn_transport_t *transport)
 {
   assert(transport);
@@ -305,6 +305,7 @@ int pn_transport_unbind(pn_transport_t *transport)
   pn_connection_t *conn = transport->connection;
   transport->connection = NULL;
 
+  // XXX: what happens if the endpoints are freed before we get here?
   pn_session_t *ssn = pn_session_head(conn, 0);
   while (ssn) {
     pn_delivery_map_clear(&ssn->state.incoming);
@@ -319,28 +320,31 @@ int pn_transport_unbind(pn_transport_t *transport)
     endpoint = endpoint->endpoint_next;
   }
 
+  pni_transport_unbind_channels(transport->local_channels);
+  pni_transport_unbind_channels(transport->remote_channels);
+
   pn_connection_unbound(conn);
-  pn_decref(conn);
+  pn_decref2(conn, transport);
   return 0;
 }
 
 pn_error_t *pn_transport_error(pn_transport_t *transport)
 {
-  return transport->error;
+  return NULL;
 }
 
-static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
+static void pni_map_remote_handle(pn_link_t *link, uint32_t handle)
 {
   link->state.remote_handle = handle;
-  pn_hash_put(ssn->state.remote_handles, handle, link);
+  pn_hash_put(link->session->state.remote_handles, handle, link);
 }
 
-void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
+static void pni_unmap_remote_handle(pn_link_t *link)
 {
-  uint32_t handle = link->state.remote_handle;
+  uintptr_t handle = link->state.remote_handle;
   link->state.remote_handle = -2;
   // may delete link:
-  pn_hash_del(ssn->state.remote_handles, handle);
+  pn_hash_del(link->session->state.remote_handles, handle);
 }
 
 pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle)
@@ -392,13 +396,12 @@ void pni_disposition_encode(pn_disposition_t 
*disposition, pn_data_t *data)
   }
 }
 
-int pn_post_close(pn_transport_t *transport, const char *condition)
+int pn_post_close(pn_transport_t *transport, const char *condition, const char 
*description)
 {
   pn_condition_t *cond = NULL;
   if (transport->connection) {
     cond = pn_connection_condition(transport->connection);
   }
-  const char *description = NULL;
   pn_data_t *info = NULL;
   if (!condition && pn_condition_is_set(cond)) {
     condition = pn_condition_get_name(cond);
@@ -418,13 +421,16 @@ int pn_do_error(pn_transport_t *transport, const char 
*condition, const char *fm
   // XXX: result
   vsnprintf(buf, 1024, fmt, ap);
   va_end(ap);
-  pn_error_set(transport->error, PN_ERR, buf);
   if (!transport->close_sent) {
-    pn_post_close(transport, condition);
+    if (!transport->open_sent) {
+      pn_post_frame(transport->disp, 0, "DL[S]", OPEN, "");
+    }
+
+    pn_post_close(transport, condition, buf);
     transport->close_sent = true;
   }
   transport->disp->halt = true;
-  pn_transport_logf(transport, "ERROR %s %s", condition, 
pn_error_text(transport->error));
+  pn_transport_logf(transport, "ERROR %s %s", condition, buf);
   return PN_ERR;
 }
 
@@ -459,7 +465,6 @@ int pn_do_open(pn_dispatcher_t *disp)
     }
     disp->remote_max_frame = transport->remote_max_frame;
     pn_buffer_clear( disp->frame );
-    pn_buffer_ensure( disp->frame, disp->remote_max_frame );
   }
   if (container_q) {
     transport->remote_container = pn_bytes_strdup(remote_container);
@@ -474,12 +479,7 @@ int pn_do_open(pn_dispatcher_t *disp)
 
   if (conn) {
     PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
-
-    pn_event_t *event = pn_collector_put(conn->collector,
-                                         PN_CONNECTION_REMOTE_STATE);
-    if (event) {
-      pn_event_init_connection(event, conn);
-    }
+    pn_collector_put(conn->collector, PN_CONNECTION_REMOTE_OPEN, conn);
   } else {
     transport->disp->halt = true;
   }
@@ -506,15 +506,9 @@ int pn_do_begin(pn_dispatcher_t *disp)
     ssn = pn_session(transport->connection);
   }
   ssn->state.incoming_transfer_count = next;
-  pn_map_channel(transport, disp->channel, ssn);
+  pni_map_remote_channel(ssn, disp->channel);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
-
-  pn_event_t *event = pn_collector_put(transport->connection->collector,
-                                       PN_SESSION_REMOTE_STATE);
-  if (event) {
-    pn_event_init_session(event, ssn);
-  }
-
+  pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, 
ssn);
   return 0;
 }
 
@@ -537,18 +531,18 @@ pn_link_t *pn_find_link(pn_session_t *ssn, pn_bytes_t 
name, bool is_sender)
 static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol)
 {
   if (!symbol.start)
-    return PN_SESSION_CLOSE;
+    return PN_EXPIRE_WITH_SESSION;
 
   if (!strncmp(symbol.start, "link-detach", symbol.size))
-    return PN_LINK_CLOSE;
+    return PN_EXPIRE_WITH_LINK;
   if (!strncmp(symbol.start, "session-end", symbol.size))
-    return PN_SESSION_CLOSE;
+    return PN_EXPIRE_WITH_SESSION;
   if (!strncmp(symbol.start, "connection-close", symbol.size))
-    return PN_CONNECTION_CLOSE;
+    return PN_EXPIRE_WITH_CONNECTION;
   if (!strncmp(symbol.start, "never", symbol.size))
-    return PN_NEVER;
+    return PN_EXPIRE_NEVER;
 
-  return PN_SESSION_CLOSE;
+  return PN_EXPIRE_WITH_SESSION;
 }
 
 static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol)
@@ -613,6 +607,10 @@ int pn_do_attach(pn_dispatcher_t *disp)
   strname[name.size] = '\0';
 
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  if (!ssn) {
+      pn_do_error(transport, "amqp:connection:no-session", "attach without a 
session");
+      return PN_EOS;
+  }
   pn_link_t *link = pn_find_link(ssn, name, is_sender);
   if (!link) {
     if (is_sender) {
@@ -626,7 +624,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
     free(strheap);
   }
 
-  pn_map_handle(ssn, handle, link);
+  pni_map_remote_handle(link, handle);
   PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
   pn_terminus_t *rsrc = &link->remote_source;
   if (source.start || src_dynamic) {
@@ -684,12 +682,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
     link->state.delivery_count = idc;
   }
 
-  pn_event_t *event = pn_collector_put(transport->connection->collector,
-                                       PN_LINK_REMOTE_STATE);
-  if (event) {
-    pn_event_init_link(event, link);
-  }
-
+  pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_OPEN, 
link);
   return 0;
 }
 
@@ -770,11 +763,7 @@ int pn_do_transfer(pn_dispatcher_t *disp)
     pn_post_flow(transport, ssn, link);
   }
 
-  pn_event_t *event = pn_collector_put(transport->connection->collector, 
PN_DELIVERY);
-  if (event) {
-    pn_event_init_delivery(event, delivery);
-  }
-
+  pn_collector_put(transport->connection->collector, PN_DELIVERY, delivery);
   return 0;
 }
 
@@ -824,10 +813,7 @@ int pn_do_flow(pn_dispatcher_t *disp)
       }
     }
 
-    pn_event_t *event = pn_collector_put(transport->connection->collector, 
PN_LINK_FLOW);
-    if (event) {
-      pn_event_init_link(event, link);
-    }
+    pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
   }
 
   return 0;
@@ -922,10 +908,7 @@ int pn_do_disposition(pn_dispatcher_t *disp)
       delivery->updated = true;
       pn_work_update(transport->connection, delivery);
 
-      pn_event_t *event = pn_collector_put(transport->connection->collector, 
PN_DELIVERY);
-      if (event) {
-        pn_event_init_delivery(event, delivery);
-      }
+      pn_collector_put(transport->connection->collector, PN_DELIVERY, 
delivery);
     }
   }
 
@@ -955,16 +938,12 @@ int pn_do_detach(pn_dispatcher_t *disp)
   if (closed)
   {
     PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
-    pn_event_t *event = pn_collector_put(transport->connection->collector,
-                                         PN_LINK_REMOTE_STATE);
-    if (event) {
-      pn_event_init_link(event, link);
-    }
+    pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_CLOSE, 
link);
   } else {
     // TODO: implement
   }
 
-  pn_unmap_handle(ssn, link);
+  pni_unmap_remote_handle(link);
   return 0;
 }
 
@@ -975,12 +954,8 @@ int pn_do_end(pn_dispatcher_t *disp)
   int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, 
SCAN_ERROR_DEFAULT);
   if (err) return err;
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
-  pn_event_t *event = pn_collector_put(transport->connection->collector,
-                                       PN_SESSION_REMOTE_STATE);
-  if (event) {
-    pn_event_init_session(event, ssn);
-  }
-  pn_unmap_channel(transport, ssn);
+  pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, 
ssn);
+  pni_unmap_remote_channel(ssn);
   return 0;
 }
 
@@ -992,11 +967,7 @@ int pn_do_close(pn_dispatcher_t *disp)
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
-  pn_event_t *event = pn_collector_put(transport->connection->collector,
-                                       PN_CONNECTION_REMOTE_STATE);
-  if (event) {
-    pn_event_init_connection(event, conn);
-  }
+  pn_collector_put(transport->connection->collector, 
PN_CONNECTION_REMOTE_CLOSE, conn);
   return 0;
 }
 
@@ -1043,11 +1014,7 @@ static ssize_t transport_consume(pn_transport_t 
*transport)
     } else if (n == 0) {
       break;
     } else {
-      if (n != PN_EOS) {
-        pn_transport_logf(transport, "ERROR[%i] %s\n",
-                          pn_error_code(transport->error),
-                          pn_error_text(transport->error));
-      }
+      assert(n == PN_EOS);
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_transport_log(transport, "  <- EOS");
       transport->input_pending = 0;  // XXX ???
@@ -1071,9 +1038,10 @@ static ssize_t pn_input_read_header(pn_transport_t 
*transport, const char *bytes
   if (!available || memcmp(bytes, point, delta)) {
     char quoted[1024];
     pn_quote_data(quoted, 1024, bytes, available);
-    return pn_error_format(transport->error, PN_ERR,
-                           "%s header mismatch: '%s'%s", protocol, quoted,
-                           available ? "" : " (connection aborted)");
+    pn_do_error(transport, "amqp:connection:framing-error",
+                "%s header mismatch: '%s'%s", protocol, quoted,
+                available ? "" : " (connection aborted)");
+    return PN_EOS;
   } else {
     transport->header_count += delta;
     if (transport->header_count == size) {
@@ -1102,21 +1070,20 @@ static ssize_t pn_input_read_amqp(pn_io_layer_t 
*io_layer, const char *bytes, si
   if (transport->close_rcvd) {
     if (available > 0) {
       pn_do_error(transport, "amqp:connection:framing-error", "data after 
close");
-      return PN_ERR;
-    } else {
       return PN_EOS;
     }
   }
 
   if (!available) {
     pn_do_error(transport, "amqp:connection:framing-error", "connection 
aborted");
-    return PN_ERR;
+    return PN_EOS;
   }
 
 
   ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
   if (n < 0) {
-    return pn_error_set(transport->error, n, "dispatch error");
+    //return pn_error_set(transport->error, n, "dispatch error");
+    return PN_EOS;
   } else if (transport->close_rcvd) {
     return PN_EOS;
   } else {
@@ -1230,6 +1197,15 @@ size_t pn_session_incoming_window(pn_session_t *ssn)
   }
 }
 
+static void pni_map_local_channel(pn_session_t *ssn)
+{
+  pn_transport_t *transport = ssn->connection->transport;
+  pn_session_state_t *state = &ssn->state;
+  uint16_t channel = allocate_alias(transport->local_channels);
+  state->local_channel = channel;
+  pn_hash_put(transport->local_channels, channel, ssn);
+}
+
 int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SESSION && transport->open_sent)
@@ -1238,16 +1214,14 @@ int pn_process_ssn_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
     pn_session_state_t *state = &ssn->state;
     if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == 
(uint16_t) -1)
     {
-      uint16_t channel = allocate_alias(transport->local_channels);
+      pni_map_local_channel(ssn);
       state->incoming_window = pn_session_incoming_window(ssn);
       state->outgoing_window = pn_session_outgoing_window(ssn);
-      pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
+      pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), 
state->remote_channel,
                     state->outgoing_transfer_count,
                     state->incoming_window,
                     state->outgoing_window);
-      state->local_channel = channel;
-      pn_hash_put(transport->local_channels, channel, ssn);
     }
   }
 
@@ -1258,18 +1232,25 @@ static const char *expiry_symbol(pn_expiry_policy_t 
policy)
 {
   switch (policy)
   {
-  case PN_LINK_CLOSE:
+  case PN_EXPIRE_WITH_LINK:
     return "link-detach";
-  case PN_SESSION_CLOSE:
+  case PN_EXPIRE_WITH_SESSION:
     return NULL;
-  case PN_CONNECTION_CLOSE:
+  case PN_EXPIRE_WITH_CONNECTION:
     return "connection-close";
-  case PN_NEVER:
+  case PN_EXPIRE_NEVER:
     return "never";
   }
   return NULL;
 }
 
+static void pni_map_local_handle(pn_link_t *link) {
+  pn_link_state_t *state = &link->state;
+  pn_session_state_t *ssn_state = &link->session->state;
+  state->local_handle = allocate_alias(ssn_state->local_handles);
+  pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+}
+
 int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (transport->open_sent && (endpoint->type == SENDER ||
@@ -1281,8 +1262,7 @@ int pn_process_link_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
     if (((int16_t) ssn_state->local_channel >= 0) &&
         !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == 
(uint32_t) -1)
     {
-      state->local_handle = allocate_alias(ssn_state->local_handles);
-      pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+      pni_map_local_handle(link);
       const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
       int err = pn_post_frame(transport->disp, ssn_state->local_channel,
                               "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", 
ATTACH,
@@ -1464,6 +1444,8 @@ int pn_process_tpwork_sender(pn_transport_t *transport, 
pn_delivery_t *delivery,
         link->queued--;
         link->session->outgoing_deliveries--;
       }
+
+      pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
     }
   }
 
@@ -1573,6 +1555,14 @@ int pn_process_flow_sender(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
   return 0;
 }
 
+static void pni_unmap_local_handle(pn_link_t *link) {
+  pn_link_state_t *state = &link->state;
+  uintptr_t handle = state->local_handle;
+  state->local_handle = -2;
+  // may delete link
+  pn_hash_del(link->session->state.local_handles, handle);
+}
+
 int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
 {
   if (endpoint->type == SENDER || endpoint->type == RECEIVER)
@@ -1601,8 +1591,7 @@ int pn_process_link_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
       int err = pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[Io?DL[sSC]]", DETACH,
                               state->local_handle, true, (bool) name, ERROR, 
name, description, info);
       if (err) return err;
-      pn_hash_del(ssn_state->local_handles, state->local_handle);
-      state->local_handle = -2;
+      pni_unmap_local_handle(link);
     }
 
     pn_clear_modified(transport->connection, endpoint);
@@ -1634,6 +1623,17 @@ bool pn_pointful_buffering(pn_transport_t *transport, 
pn_session_t *session)
   return false;
 }
 
+static void pni_unmap_local_channel(pn_session_t *ssn) {
+  // XXX: should really update link state also
+  pni_transport_unbind_handles(ssn->state.local_handles);
+  pn_transport_t *transport = ssn->connection->transport;
+  pn_session_state_t *state = &ssn->state;
+  uintptr_t channel = state->local_channel;
+  state->local_channel = -2;
+  // may delete session
+  pn_hash_del(transport->local_channels, channel);
+}
+
 int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SESSION)
@@ -1643,7 +1643,9 @@ int pn_process_ssn_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
     if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
         && !transport->close_sent)
     {
-      if (pn_pointful_buffering(transport, session)) return 0;
+      if (pn_pointful_buffering(transport, session)) {
+        return 0;
+      }
 
       const char *name = NULL;
       const char *description = NULL;
@@ -1658,8 +1660,7 @@ int pn_process_ssn_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
       int err = pn_post_frame(transport->disp, state->local_channel, 
"DL[?DL[sSC]]", END,
                               (bool) name, ERROR, name, description, info);
       if (err) return err;
-      pn_hash_del(transport->local_channels, state->local_channel);
-      state->local_channel = -2;
+      pni_unmap_local_channel(session);
     }
 
     pn_clear_modified(transport->connection, endpoint);
@@ -1673,7 +1674,7 @@ int pn_process_conn_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
   {
     if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
       if (pn_pointful_buffering(transport, NULL)) return 0;
-      int err = pn_post_close(transport, NULL);
+      int err = pn_post_close(transport, NULL, NULL);
       if (err) return err;
       transport->close_sent = true;
     }
@@ -1733,13 +1734,10 @@ static ssize_t pn_output_write_header(pn_transport_t 
*transport,
 {
   if (transport->disp->trace & PN_TRACE_FRM)
     pn_transport_logf(transport, "  -> %s", protocol);
-  if (size >= hdrsize) {
-    memmove(bytes, header, hdrsize);
-    transport->io_layers[PN_IO_AMQP].process_output = next;
-    return hdrsize;
-  } else {
-    return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing 
%s header", protocol);
-  }
+  assert(size >= hdrsize);
+  memmove(bytes, header, hdrsize);
+  transport->io_layers[PN_IO_AMQP].process_output = next;
+  return hdrsize;
 }
 
 static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char 
*bytes, size_t size)
@@ -1752,22 +1750,19 @@ static ssize_t 
pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes,
 static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, 
size_t size)
 {
   pn_transport_t *transport = (pn_transport_t *)io_layer->context;
-  if (!transport->connection) {
-    return 0;
-  }
-
-  if (!pn_error_code(transport->error)) {
-    pn_error_set(transport->error, pn_process(transport), "process error");
+  if (transport->connection && !transport->done_processing) {
+    int err = pn_process(transport);
+    if (err) {
+      pn_transport_logf(transport, "process error %i", err);
+      transport->done_processing = true;
+    }
   }
 
-  // write out any buffered data _before_ returning an error code,
-  // else we could truncate an outgoing Close frame containing a
-  // useful error status
-  if (!transport->disp->available && (transport->close_sent || 
pn_error_code(transport->error))) {
-    if (pn_error_code(transport->error))
-      return pn_error_code(transport->error);
-    else
-      return PN_EOS;
+  // write out any buffered data _before_ returning PN_EOS, else we
+  // could truncate an outgoing Close frame containing a useful error
+  // status
+  if (!transport->disp->available && transport->close_sent) {
+    return PN_EOS;
   }
 
   return pn_dispatcher_output(transport->disp, bytes, size);
@@ -1779,20 +1774,18 @@ static ssize_t transport_produce(pn_transport_t 
*transport)
   pn_io_layer_t *io_layer = transport->io_layers;
   ssize_t space = transport->output_size - transport->output_pending;
 
-  if (space == 0) {     // can we expand the buffer?
+  if (space <= 0) {     // can we expand the buffer?
     int more = 0;
     if (!transport->remote_max_frame)   // no limit, so double it
       more = transport->output_size;
     else if (transport->remote_max_frame > transport->output_size)
-      more = transport->remote_max_frame - transport->output_size;
+      more = pn_min(transport->output_size, transport->remote_max_frame - 
transport->output_size);
     if (more) {
-      char *newbuf = (char *)malloc( transport->output_size + more );
+      char *newbuf = (char *)realloc( transport->output_buf, 
transport->output_size + more );
       if (newbuf) {
-        memmove( newbuf, transport->output_buf, transport->output_pending );
-        free( transport->output_buf );
         transport->output_buf = newbuf;
         transport->output_size += more;
-        space = more;
+        space += more;
       }
     }
   }
@@ -1811,11 +1804,12 @@ static ssize_t transport_produce(pn_transport_t 
*transport)
       if (transport->output_pending)
         break;   // return what is available
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
-        if (n == PN_EOS)
+        if (n < 0) {
           pn_transport_log(transport, "  -> EOS");
-        else
+        }
+        /*else
           pn_transport_logf(transport, "  -> EOS (%" PN_ZI ") %s", n,
-                            pn_error_text(transport->error));
+          pn_error_text(transport->error));*/
       }
       return n;
     }
@@ -1993,22 +1987,20 @@ ssize_t pn_transport_capacity(pn_transport_t 
*transport)  /* <0 == done */
   //if (pn_error_code(transport->error)) return 
pn_error_code(transport->error);
 
   ssize_t capacity = transport->input_size - transport->input_pending;
-  if (!capacity) {
+  if ( capacity<=0 ) {
     // can we expand the size of the input buffer?
     int more = 0;
     if (!transport->local_max_frame) {  // no limit (ha!)
       more = transport->input_size;
     } else if (transport->local_max_frame > transport->input_size) {
-      more = transport->local_max_frame - transport->input_size;
+      more = pn_min(transport->input_size, transport->local_max_frame - 
transport->input_size);
     }
     if (more) {
-      char *newbuf = (char *) malloc( transport->input_size + more );
+      char *newbuf = (char *) realloc( transport->input_buf, 
transport->input_size + more );
       if (newbuf) {
-        memmove( newbuf, transport->input_buf, transport->input_pending );
-        free( transport->input_buf );
         transport->input_buf = newbuf;
         transport->input_size += more;
-        capacity = more;
+        capacity += more;
       }
     }
   }
@@ -2024,7 +2016,7 @@ char *pn_transport_tail(pn_transport_t *transport)
   return NULL;
 }
 
-int pn_transport_push(pn_transport_t *transport, const char *src, size_t size)
+ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t 
size)
 {
   assert(transport);
 
@@ -2032,14 +2024,19 @@ int pn_transport_push(pn_transport_t *transport, const 
char *src, size_t size)
   if (capacity < 0) {
     return capacity;
   } else if (size > (size_t) capacity) {
-    return PN_OVERFLOW;
+    size = capacity;
   }
 
   char *dst = pn_transport_tail(transport);
   assert(dst);
   memmove(dst, src, size);
 
-  return pn_transport_process(transport, size);
+  int n = pn_transport_process(transport, size);
+  if (n < 0) {
+    return n;
+  } else {
+    return size;
+  }
 }
 
 int pn_transport_process(pn_transport_t *transport, size_t size)
@@ -2062,8 +2059,7 @@ int pn_transport_process(pn_transport_t *transport, 
size_t size)
 int pn_transport_close_tail(pn_transport_t *transport)
 {
   transport->tail_closed = true;
-  ssize_t x = transport_consume( transport );
-  if (x < 0) return (int) x;
+  transport_consume( transport );
   return 0;
   // XXX: what if not all input processed at this point?  do we care???
 }
@@ -2084,7 +2080,7 @@ const char *pn_transport_head(pn_transport_t *transport)
   return NULL;
 }
 
-int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
+ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
 {
   assert(transport);
 
@@ -2092,7 +2088,7 @@ int pn_transport_peek(pn_transport_t *transport, char 
*dst, size_t size)
   if (pending < 0) {
     return pending;
   } else if (size > (size_t) pending) {
-    return PN_UNDERFLOW;
+    size = pending;
   }
 
   if (pending > 0) {
@@ -2101,7 +2097,7 @@ int pn_transport_peek(pn_transport_t *transport, char 
*dst, size_t size)
     memmove(dst, src, size);
   }
 
-  return 0;
+  return size;
 }
 
 void pn_transport_pop(pn_transport_t *transport, size_t size)
@@ -2120,11 +2116,7 @@ void pn_transport_pop(pn_transport_t *transport, size_t 
size)
 int pn_transport_close_head(pn_transport_t *transport)
 {
   transport->head_closed = true;
-  if (transport->close_sent && transport->output_pending == 0) {
-    return 0;
-  } else {
-    return pn_error_set(transport->error, PN_ERR, "connection aborted");
-  }
+  return 0;
 }
 
 // true if the transport will not generate further output

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/types.c
----------------------------------------------------------------------
diff --git a/proton-c/src/types.c b/proton-c/src/types.c
index 1179db2..51abc59 100644
--- a/proton-c/src/types.c
+++ b/proton-c/src/types.c
@@ -23,20 +23,8 @@
 #include <stdlib.h>
 #include <string.h>
 
-pn_bytes_t pn_bytes(size_t size, char *start)
+pn_bytes_t pn_bytes(size_t size, const char *start)
 {
   pn_bytes_t bytes = {size, start};
   return bytes;
 }
-
-pn_bytes_t pn_bytes_dup(size_t size, const char *start)
-{
-  if (size && start)
-  {
-    char *dup = (char *) malloc(size);
-    memmove(dup, start, size);
-    return pn_bytes(size, dup);
-  } else {
-    return pn_bytes(0, NULL);
-  }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/util.c
----------------------------------------------------------------------
diff --git a/proton-c/src/util.c b/proton-c/src/util.c
index e6b9af0..16e3685 100644
--- a/proton-c/src/util.c
+++ b/proton-c/src/util.c
@@ -139,20 +139,28 @@ void pni_urldecode(const char *src, char *dst)
 
 // Parse URL syntax:
 // [ <scheme> :// ] [ <user> [ : <password> ] @ ] <host> [ : <port> ] [ / 
<path> ]
-// <user>, <password>, <host>, <port> cannot contain any of '@', ':', '/'
+// <scheme>, <user>, <password>, <port> cannot contain any of '@', ':', '/'
+// If the first character of <host> is '[' then it can contain any character 
up to ']' (this is to allow IPv6
+// literal syntax). Otherwise it also cannot contain '@', ':', '/'
+// <host> is not optional but it can be null! If it is not present an empty 
string will be returned
 // <path> can contain any character
 void pni_parse_url(char *url, char **scheme, char **user, char **pass, char 
**host, char **port, char **path)
 {
   if (!url) return;
 
-  char *scheme_end = strstr(url, "://");
-  if (scheme_end) {
-    *scheme_end = '\0';
-    *scheme = url;
-    url = scheme_end + 3;
+  char *slash = strchr(url, '/');
+
+  if (slash && slash>url) {
+    char *scheme_end = strstr(slash-1, "://");
+
+    if (scheme_end && scheme_end<slash) {
+      *scheme_end = '\0';
+      *scheme = url;
+      url = scheme_end + 3;
+      slash = strchr(url, '/');
+    }
   }
 
-  char *slash = strchr(url, '/');
   if (slash) {
     *slash = '\0';
     *path = slash + 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to