http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/messenger.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c index 29b2eeb..0e2488b 100644 --- a/proton-c/src/messenger/messenger.c +++ b/proton-c/src/messenger/messenger.c @@ -31,6 +31,7 @@ #include <stdlib.h> #include <string.h> #include <stdio.h> + #include "../util.h" #include "../platform.h" #include "../platform_fmt.h" @@ -60,30 +61,20 @@ typedef enum { } pn_link_credit_mode_t; struct pn_messenger_t { + pn_address_t address; char *name; char *certificate; char *private_key; char *password; char *trusted_certificates; - int timeout; - bool blocking; - bool passive; pn_io_t *io; pn_list_t *pending; // pending selectables pn_selectable_t *interruptor; - bool interrupted; pn_socket_t ctrl[2]; pn_list_t *listeners; pn_list_t *connections; pn_selector_t *selector; pn_collector_t *collector; - int send_threshold; - pn_link_credit_mode_t credit_mode; - int credit_batch; // when LINK_CREDIT_AUTO - int credit; // available - int distributed; // credit - int receivers; // # receiver links - int draining; // # links in drain state pn_list_t *credited; pn_list_t *blocked; pn_timestamp_t next_drain; @@ -95,13 +86,24 @@ struct pn_messenger_t { pn_error_t *error; pn_transform_t *routes; pn_transform_t *rewrites; - pn_address_t address; pn_tracker_t outgoing_tracker; pn_tracker_t incoming_tracker; pn_string_t *original; pn_string_t *rewritten; - bool worked; + pn_string_t *domain; + int timeout; + int send_threshold; + pn_link_credit_mode_t credit_mode; + int credit_batch; // when LINK_CREDIT_AUTO + int credit; // available + int distributed; // credit + int receivers; // # receiver links + int draining; // # links in drain state int connection_error; + bool blocking; + bool passive; + bool interrupted; + bool worked; }; #define CTX_HEAD \ @@ -611,7 +613,7 @@ pn_messenger_t *pn_messenger(const char *name) pni_selectable_set_context(m->interruptor, m); m->listeners = pn_list(0, 0); m->connections = pn_list(0, 0); - m->selector = pn_selector(); + m->selector = pn_io_selector(m->io); m->collector = pn_collector(); m->credit_mode = LINK_CREDIT_EXPLICIT; m->credit_batch = 1024; @@ -635,6 +637,7 @@ pn_messenger_t *pn_messenger(const char *name) m->address.text = pn_string(NULL); m->original = pn_string(NULL); m->rewritten = pn_string(NULL); + m->domain = pn_string(NULL); m->connection_error = 0; } @@ -775,6 +778,7 @@ static void pni_reclaim(pn_messenger_t *messenger) void pn_messenger_free(pn_messenger_t *messenger) { if (messenger) { + pn_free(messenger->domain); pn_free(messenger->rewritten); pn_free(messenger->original); pn_free(messenger->address.text); @@ -971,7 +975,7 @@ int pni_pump_in(pn_messenger_t *messenger, const char *address, pn_link_t *recei size_t pending = pn_delivery_pending(d); int err = pn_buffer_ensure(buf, pending + 1); if (err) return pn_error_format(messenger->error, err, "get: error growing buffer"); - char *encoded = pn_buffer_bytes(buf).start; + char *encoded = pn_buffer_memory(buf).start; ssize_t n = pn_link_recv(receiver, encoded, pending); if (n != (ssize_t) pending) { return pn_error_format(messenger->error, n, @@ -1221,16 +1225,31 @@ int pn_messenger_process_events(pn_messenger_t *messenger) while ((event = pn_collector_peek(messenger->collector))) { processed++; switch (pn_event_type(event)) { - case PN_CONNECTION_REMOTE_STATE: - case PN_CONNECTION_LOCAL_STATE: + case PN_CONNECTION_INIT: + //printf("connection created: %p\n", (void *) pn_event_connection(event)); + break; + case PN_SESSION_INIT: + //printf("session created: %p\n", (void *) pn_event_session(event)); + break; + case PN_LINK_INIT: + //printf("link created: %p\n", (void *) pn_event_link(event)); + break; + case PN_CONNECTION_REMOTE_OPEN: + case PN_CONNECTION_REMOTE_CLOSE: + case PN_CONNECTION_OPEN: + case PN_CONNECTION_CLOSE: pn_messenger_process_connection(messenger, event); break; - case PN_SESSION_REMOTE_STATE: - case PN_SESSION_LOCAL_STATE: + case PN_SESSION_REMOTE_OPEN: + case PN_SESSION_REMOTE_CLOSE: + case PN_SESSION_OPEN: + case PN_SESSION_CLOSE: pn_messenger_process_session(messenger, event); break; - case PN_LINK_REMOTE_STATE: - case PN_LINK_LOCAL_STATE: + case PN_LINK_REMOTE_OPEN: + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_OPEN: + case PN_LINK_CLOSE: pn_messenger_process_link(messenger, event); break; case PN_LINK_FLOW: @@ -1244,6 +1263,12 @@ int pn_messenger_process_events(pn_messenger_t *messenger) break; case PN_EVENT_NONE: break; + case PN_CONNECTION_FINAL: + break; + case PN_SESSION_FINAL: + break; + case PN_LINK_FINAL: + break; } pn_collector_pop(messenger->collector); } @@ -1251,8 +1276,37 @@ int pn_messenger_process_events(pn_messenger_t *messenger) return processed; } +/** + * Function to invoke AMQP related timer events, such as a heartbeat to prevent + * remote_idle timeout events + */ +static void pni_messenger_tick(pn_messenger_t *messenger) +{ + for (size_t i = 0; i < pn_list_size(messenger->connections); i++) { + pn_connection_t *connection = + (pn_connection_t *)pn_list_get(messenger->connections, i); + pn_transport_t *transport = pn_connection_transport(connection); + if (transport) { + pn_transport_tick(transport, pn_i_now()); + + // if there is pending data, such as an empty heartbeat frame, call + // process events. This should kick off the chain of selectables for + // reading/writing. + ssize_t pending = pn_transport_pending(transport); + if (pending > 0) { + pn_connection_ctx_t *cctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + pn_messenger_process_events(messenger); + pn_messenger_flow(messenger); + pni_conn_modified(pni_context(cctx->selectable)); + } + } + } +} + int pn_messenger_process(pn_messenger_t *messenger) { + bool doMessengerTick = true; pn_selectable_t *sel; int events; while ((sel = pn_selector_next(messenger->selector, &events))) { @@ -1261,12 +1315,17 @@ int pn_messenger_process(pn_messenger_t *messenger) } if (events & PN_WRITABLE) { pn_selectable_writable(sel); + doMessengerTick = false; } if (events & PN_EXPIRED) { pn_selectable_expired(sel); } } - + // ensure timer events are processed. Cannot call this inside the while loop + // as the timer events are not seen by the selector + if (doMessengerTick) { + pni_messenger_tick(messenger); + } if (messenger->interrupted) { messenger->interrupted = false; return PN_INTR; @@ -1429,12 +1488,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add { assert(messenger); messenger->connection_error = 0; - char domain[1024]; - if (address && sizeof(domain) < strlen(address) + 1) { - pn_error_format(messenger->error, PN_ERR, - "address exceeded maximum length: %s", address); - return NULL; - } + pn_string_t *domain = messenger->domain; int err = pni_route(messenger, address); if (err) return NULL; @@ -1459,16 +1513,14 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add return NULL; } - domain[0] = '\0'; + pn_string_set(domain, ""); if (user) { - strcat(domain, user); - strcat(domain, "@"); + pn_string_addf(domain, "%s@", user); } - strcat(domain, host); + pn_string_addf(domain, "%s", host); if (port) { - strcat(domain, ":"); - strcat(domain, port); + pn_string_addf(domain, ":%s", port); } for (size_t i = 0; i < pn_list_size(messenger->connections); i++) { @@ -1480,7 +1532,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add return connection; } const char *container = pn_connection_remote_container(connection); - if (pn_streq(container, domain)) { + if (pn_streq(container, pn_string_get(domain))) { return connection; } } @@ -1529,7 +1581,16 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, boo pn_session_t *ssn = pn_session(connection); pn_session_open(ssn); - link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx"); + if (sender) { + link = pn_sender(ssn, "sender-xxx"); + } else { + if (name) { + link = pn_receiver(ssn, name); + } else { + link = pn_receiver(ssn, ""); + } + } + if ((sender && pn_messenger_get_outgoing_window(messenger)) || (!sender && pn_messenger_get_incoming_window(messenger))) { // use explicit settlement via dispositions (not pre-settled) @@ -1662,7 +1723,7 @@ int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *send pn_buffer_t *buf = pni_entry_bytes(entry); pn_bytes_t bytes = pn_buffer_bytes(buf); - char *encoded = bytes.start; + const char *encoded = bytes.start; size_t size = bytes.size; // XXX: proper tag @@ -1742,7 +1803,7 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg) pni_rewrite(messenger, msg); while (true) { - char *encoded = pn_buffer_bytes(buf).start; + char *encoded = pn_buffer_memory(buf).start; size_t size = pn_buffer_capacity(buf); int err = pn_message_encode(msg, encoded, &size); if (err == PN_OVERFLOW) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/store.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/store.c b/proton-c/src/messenger/store.c index e36c5bb..88d6a5d 100644 --- a/proton-c/src/messenger/store.c +++ b/proton-c/src/messenger/store.c @@ -34,36 +34,36 @@ typedef struct pni_stream_t pni_stream_t; struct pni_store_t { - size_t size; pni_stream_t *streams; pni_entry_t *store_head; pni_entry_t *store_tail; + pn_hash_t *tracked; + size_t size; int window; pn_sequence_t lwm; pn_sequence_t hwm; - pn_hash_t *tracked; }; struct pni_stream_t { pni_store_t *store; - char address[1024]; // XXX + pn_string_t *address; pni_entry_t *stream_head; pni_entry_t *stream_tail; pni_stream_t *next; }; struct pni_entry_t { - pn_sequence_t id; pni_stream_t *stream; - bool free; pni_entry_t *stream_next; pni_entry_t *stream_prev; pni_entry_t *store_next; pni_entry_t *store_prev; - pn_status_t status; pn_buffer_t *bytes; pn_delivery_t *delivery; void *context; + pn_status_t status; + pn_sequence_t id; + bool free; }; void pni_entry_finalize(void *object) @@ -104,13 +104,11 @@ pni_stream_t *pni_stream(pni_store_t *store, const char *address, bool create) { assert(store); assert(address); - // XXX - if (strlen(address) >= 1024) return NULL; pni_stream_t *prev = NULL; pni_stream_t *stream = store->streams; while (stream) { - if (!strcmp(stream->address, address)) { + if (!strcmp(pn_string_get(stream->address), address)) { return stream; } prev = stream; @@ -120,7 +118,7 @@ pni_stream_t *pni_stream(pni_store_t *store, const char *address, bool create) if (create) { stream = (pni_stream_t *) malloc(sizeof(pni_stream_t)); stream->store = store; - strcpy(stream->address, address); + stream->address = pn_string(address); stream->stream_head = NULL; stream->stream_tail = NULL; stream->next = NULL; @@ -169,6 +167,8 @@ void pni_stream_free(pni_stream_t *stream) while ((entry = LL_HEAD(stream, stream))) { pni_entry_free(entry); } + pn_free(stream->address); + stream->address = NULL; free(stream); } @@ -205,7 +205,7 @@ pni_stream_t *pni_stream_get(pni_store_t *store, const char *address) pni_entry_t *pni_store_put(pni_store_t *store, const char *address) { assert(store); - static pn_class_t clazz = PN_CLASS(pni_entry); + static const pn_class_t clazz = PN_CLASS(pni_entry); if (!address) address = ""; pni_stream_t *stream = pni_stream_put(store, address); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/subscription.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/subscription.c b/proton-c/src/messenger/subscription.c index 95f3f09..346a23f 100644 --- a/proton-c/src/messenger/subscription.c +++ b/proton-c/src/messenger/subscription.c @@ -64,7 +64,7 @@ pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *host, const char *port) { - static pn_class_t clazz = PN_CLASS(pn_subscription); + static const pn_class_t clazz = PN_CLASS(pn_subscription); pn_subscription_t *sub = (pn_subscription_t *) pn_new(sizeof(pn_subscription_t), &clazz); sub->messenger = messenger; pn_string_set(sub->scheme, scheme); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/messenger/transform.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/transform.c b/proton-c/src/messenger/transform.c index c9d2c14..801eb10 100644 --- a/proton-c/src/messenger/transform.c +++ b/proton-c/src/messenger/transform.c @@ -62,7 +62,7 @@ static void pn_rule_finalize(void *object) pn_rule_t *pn_rule(const char *pattern, const char *substitution) { - static pn_class_t clazz = PN_CLASS(pn_rule); + static const pn_class_t clazz = PN_CLASS(pn_rule); pn_rule_t *rule = (pn_rule_t *) pn_new(sizeof(pn_rule_t), &clazz); rule->pattern = pn_string(pattern); rule->substitution = pn_string(substitution); @@ -82,7 +82,7 @@ static void pn_transform_finalize(void *object) pn_transform_t *pn_transform() { - static pn_class_t clazz = PN_CLASS(pn_transform); + static const pn_class_t clazz = PN_CLASS(pn_transform); pn_transform_t *transform = (pn_transform_t *) pn_new(sizeof(pn_transform_t), &clazz); transform->rules = pn_list(0, PN_REFCOUNT); transform->matched = false; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/object/object.c ---------------------------------------------------------------------- diff --git a/proton-c/src/object/object.c b/proton-c/src/object/object.c index dd5fd17..d1b0e43 100644 --- a/proton-c/src/object/object.c +++ b/proton-c/src/object/object.c @@ -29,14 +29,19 @@ #include <ctype.h> typedef struct { - pn_class_t *clazz; + const pn_class_t *clazz; int refcount; } pni_head_t; #define pni_head(PTR) \ (((pni_head_t *) (PTR)) - 1) -void *pn_new(size_t size, pn_class_t *clazz) +void *pn_new(size_t size, const pn_class_t *clazz) +{ + return pn_new2(size, clazz, NULL); +} + +void *pn_new2(size_t size, const pn_class_t *clazz, void *from) { pni_head_t *head = (pni_head_t *) malloc(sizeof(pni_head_t) + size); void *object = head + 1; @@ -44,7 +49,7 @@ void *pn_new(size_t size, pn_class_t *clazz) return object; } -void pn_initialize(void *object, pn_class_t *clazz) +void pn_initialize(void *object, const pn_class_t *clazz) { pni_head_t *head = pni_head(object); head->clazz = clazz; @@ -54,15 +59,22 @@ void pn_initialize(void *object, pn_class_t *clazz) } } -void *pn_incref(void *object) -{ +void *pn_incref(void *object) { + return pn_incref2(object, NULL); +} + +void *pn_incref2(void *object, void *from) { if (object) { pni_head(object)->refcount++; } return object; } -void pn_decref(void *object) +void pn_decref(void *object) { + pn_decref2(object, NULL); +} + +void pn_decref2(void *object, void *from) { if (object) { pni_head_t *head = pni_head(object); @@ -70,7 +82,11 @@ void pn_decref(void *object) head->refcount--; if (!head->refcount) { pn_finalize(object); - free(head); + // Check the refcount again in case finalize created a new + // reference. + if (!head->refcount) { + free(head); + } } } } @@ -83,7 +99,6 @@ void pn_finalize(void *object) if (head->clazz && head->clazz->finalize) { head->clazz->finalize(object); } - head->refcount = 0; } } @@ -101,7 +116,7 @@ void pn_free(void *object) } } -pn_class_t *pn_class(void *object) +const pn_class_t *pn_class(void *object) { assert(object); return pni_head(object)->clazz; @@ -127,7 +142,7 @@ intptr_t pn_compare(void *a, void *b) pni_head_t *hb = pni_head(b); if (ha->clazz && hb->clazz && ha->clazz == hb->clazz) { - pn_class_t *clazz = ha->clazz; + const pn_class_t *clazz = ha->clazz; if (clazz->compare) { return clazz->compare(a, b); } @@ -150,13 +165,20 @@ int pn_inspect(void *object, pn_string_t *dst) if (object) { pni_head_t *head = pni_head(object); + const char *name; if (head->clazz) { - pn_class_t *clazz = head->clazz; + const pn_class_t *clazz = head->clazz; if (clazz->inspect) { return clazz->inspect(object, dst); + } else if (clazz->name) { + name = clazz->name; + } else { + name = "object"; } + } else { + name = "object"; } - return pn_string_addf(dst, "object<%p>", object); + return pn_string_addf(dst, "%s<%p>", name, object); } else { return pn_string_addf(dst, "(null)"); } @@ -185,9 +207,9 @@ void pn_list_set(pn_list_t *list, int index, void *value) { assert(list); assert(list->size); void *old = list->elements[index % list->size]; - if (list->options & PN_REFCOUNT) pn_decref(old); + if (list->options & PN_REFCOUNT) pn_decref2(old, list); list->elements[index % list->size] = value; - if (list->options & PN_REFCOUNT) pn_incref(value); + if (list->options & PN_REFCOUNT) pn_incref2(value, list); } void pn_list_ensure(pn_list_t *list, size_t capacity) @@ -207,7 +229,7 @@ int pn_list_add(pn_list_t *list, void *value) assert(list); pn_list_ensure(list, list->size + 1); list->elements[list->size++] = value; - if (list->options & PN_REFCOUNT) pn_incref(value); + if (list->options & PN_REFCOUNT) pn_incref2(value, list); return 0; } @@ -242,7 +264,7 @@ void pn_list_del(pn_list_t *list, int index, int n) if (list->options & PN_REFCOUNT) { for (int i = 0; i < n; i++) { - pn_decref(list->elements[index + i]); + pn_decref2(list->elements[index + i], list); } } @@ -294,7 +316,7 @@ static void pn_list_finalize(void *object) assert(object); pn_list_t *list = (pn_list_t *) object; for (size_t i = 0; i < list->size; i++) { - if (list->options & PN_REFCOUNT) pn_decref(pn_list_get(list, i)); + if (list->options & PN_REFCOUNT) pn_decref2(pn_list_get(list, i), list); } free(list->elements); } @@ -354,7 +376,7 @@ static int pn_list_inspect(void *obj, pn_string_t *dst) pn_list_t *pn_list(size_t capacity, int options) { - static pn_class_t clazz = PN_CLASS(pn_list); + static const pn_class_t clazz = PN_CLASS(pn_list); pn_list_t *list = (pn_list_t *) pn_new(sizeof(pn_list_t), &clazz); list->capacity = capacity ? capacity : 16; @@ -380,11 +402,12 @@ struct pn_map_t { size_t capacity; size_t addressable; size_t size; - float load_factor; uintptr_t (*hashcode)(void *key); bool (*equals)(void *a, void *b); + float load_factor; bool count_keys; bool count_values; + bool inspect_keys; }; static void pn_map_finalize(void *object) @@ -394,8 +417,8 @@ static void pn_map_finalize(void *object) if (map->count_keys || map->count_values) { for (size_t i = 0; i < map->capacity; i++) { if (map->entries[i].state != PNI_ENTRY_FREE) { - if (map->count_keys) pn_decref(map->entries[i].key); - if (map->count_values) pn_decref(map->entries[i].value); + if (map->count_keys) pn_decref2(map->entries[i].key, map); + if (map->count_values) pn_decref2(map->entries[i].value, map); } } } @@ -447,7 +470,11 @@ static int pn_map_inspect(void *obj, pn_string_t *dst) err = pn_string_addf(dst, ", "); if (err) return err; } - err = pn_inspect(pn_map_key(map, entry), dst); + if (map->inspect_keys) { + err = pn_inspect(pn_map_key(map, entry), dst); + } else { + err = pn_string_addf(dst, "%p", pn_map_key(map, entry)); + } if (err) return err; err = pn_string_addf(dst, ": "); if (err) return err; @@ -463,7 +490,7 @@ static int pn_map_inspect(void *obj, pn_string_t *dst) pn_map_t *pn_map(size_t capacity, float load_factor, int options) { - static pn_class_t clazz = PN_CLASS(pn_map); + static const pn_class_t clazz = PN_CLASS(pn_map); pn_map_t *map = (pn_map_t *) pn_new(sizeof(pn_map_t), &clazz); map->capacity = capacity ? capacity : 16; @@ -474,6 +501,7 @@ pn_map_t *pn_map(size_t capacity, float load_factor, int options) map->equals = pn_equals; map->count_keys = (options & PN_REFCOUNT) || (options & PN_REFCOUNT_KEY); map->count_values = (options & PN_REFCOUNT) || (options & PN_REFCOUNT_VALUE); + map->inspect_keys = true; pni_map_allocate(map); return map; } @@ -492,7 +520,7 @@ static float pni_map_load(pn_map_t *map) static bool pni_map_ensure(pn_map_t *map, size_t capacity) { float load = pni_map_load(map); - if (capacity <= map->capacity && load < map->load_factor) { + if (capacity <= map->capacity && load <= map->load_factor) { return false; } @@ -511,8 +539,8 @@ static bool pni_map_ensure(pn_map_t *map, size_t capacity) void *key = entries[i].key; void *value = entries[i].value; pn_map_put(map, key, value); - if (map->count_keys) pn_decref(key); - if (map->count_values) pn_decref(value); + if (map->count_keys) pn_decref2(key, map); + if (map->count_values) pn_decref2(value, map); } } @@ -531,7 +559,7 @@ static pni_entry_t *pni_map_entry(pn_map_t *map, void *key, pni_entry_t **pprev, if (create) { entry->state = PNI_ENTRY_TAIL; entry->key = key; - if (map->count_keys) pn_incref(key); + if (map->count_keys) pn_incref2(key, map); map->size++; return entry; } else { @@ -571,7 +599,7 @@ static pni_entry_t *pni_map_entry(pn_map_t *map, void *key, pni_entry_t **pprev, entry->state = PNI_ENTRY_LINK; map->entries[empty].state = PNI_ENTRY_TAIL; map->entries[empty].key = key; - if (map->count_keys) pn_incref(key); + if (map->count_keys) pn_incref2(key, map); if (pprev) *pprev = entry; map->size++; return &map->entries[empty]; @@ -584,9 +612,9 @@ int pn_map_put(pn_map_t *map, void *key, void *value) { assert(map); pni_entry_t *entry = pni_map_entry(map, key, NULL, true); - if (map->count_values) pn_decref(entry->value); + if (map->count_values) pn_decref2(entry->value, map); entry->value = value; - if (map->count_values) pn_incref(value); + if (map->count_values) pn_incref2(value, map); return 0; } @@ -603,17 +631,24 @@ void pn_map_del(pn_map_t *map, void *key) pni_entry_t *prev = NULL; pni_entry_t *entry = pni_map_entry(map, key, &prev, false); if (entry) { + void *dref_key = (map->count_keys) ? entry->key : NULL; + void *dref_value = (map->count_values) ? entry->value : NULL; if (prev) { prev->next = entry->next; prev->state = entry->state; + } else if (entry->next) { + assert(entry->state == PNI_ENTRY_LINK); + pni_entry_t *next = &map->entries[entry->next]; + *entry = *next; + entry = next; } entry->state = PNI_ENTRY_FREE; entry->next = 0; - if (map->count_keys) pn_decref(entry->key); entry->key = NULL; - if (map->count_values) pn_decref(entry->value); entry->value = NULL; map->size--; + if (dref_key) pn_decref2(dref_key, map); + if (dref_value) pn_decref2(dref_value, map); } } @@ -676,6 +711,7 @@ pn_hash_t *pn_hash(size_t capacity, float load_factor, int options) hash->map.equals = pni_identity_equals; hash->map.count_keys = false; hash->map.count_values = options & PN_REFCOUNT; + hash->map.inspect_keys = false; return hash; } @@ -796,7 +832,7 @@ pn_string_t *pn_string(const char *bytes) pn_string_t *pn_stringn(const char *bytes, size_t n) { - static pn_class_t clazz = PN_CLASS(pn_string); + static const pn_class_t clazz = PN_CLASS(pn_string); pn_string_t *string = (pn_string_t *) pn_new(sizeof(pn_string_t), &clazz); string->capacity = n ? n * sizeof(char) : 16; string->bytes = (char *) malloc(string->capacity); @@ -985,7 +1021,7 @@ static void pn_iterator_finalize(void *object) pn_iterator_t *pn_iterator() { - static pn_class_t clazz = PN_CLASS(pn_iterator); + static const pn_class_t clazz = PN_CLASS(pn_iterator); pn_iterator_t *it = (pn_iterator_t *) pn_new(sizeof(pn_iterator_t), &clazz); return it; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/parser.c ---------------------------------------------------------------------- diff --git a/proton-c/src/parser.c b/proton-c/src/parser.c index 6766607..fccba75 100644 --- a/proton-c/src/parser.c +++ b/proton-c/src/parser.c @@ -29,10 +29,10 @@ struct pn_parser_t { pn_scanner_t *scanner; - int error_code; char *atoms; size_t size; size_t capacity; + int error_code; }; pn_parser_t *pn_parser() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/platform.h ---------------------------------------------------------------------- diff --git a/proton-c/src/platform.h b/proton-c/src/platform.h index 6b63e2e..b0475e0 100644 --- a/proton-c/src/platform.h +++ b/proton-c/src/platform.h @@ -85,8 +85,10 @@ int pn_i_vsnprintf(char *buf, size_t count, const char *fmt, va_list ap); #endif #if defined _MSC_VER || defined _OPENVMS +#if !defined(va_copy) #define va_copy(d,s) ((d) = (s)) #endif +#endif #ifdef __cplusplus } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/driver.c ---------------------------------------------------------------------- diff --git a/proton-c/src/posix/driver.c b/proton-c/src/posix/driver.c index 0ce8f9f..b128cb2 100644 --- a/proton-c/src/posix/driver.c +++ b/proton-c/src/posix/driver.c @@ -64,19 +64,19 @@ struct pn_driver_t { struct pollfd *fds; size_t nfds; int ctrl[2]; //pipe for updating selectable status - pn_trace_t trace; pn_timestamp_t wakeup; + pn_trace_t trace; }; struct pn_listener_t { pn_driver_t *driver; pn_listener_t *listener_next; pn_listener_t *listener_prev; + void *context; int idx; - bool pending; int fd; + bool pending; bool closed; - void *context; }; #define PN_NAME_MAX (256) @@ -86,22 +86,22 @@ struct pn_connector_t { pn_connector_t *connector_next; pn_connector_t *connector_prev; char name[PN_NAME_MAX]; + pn_timestamp_t wakeup; + pn_connection_t *connection; + pn_transport_t *transport; + pn_sasl_t *sasl; + pn_listener_t *listener; + void *context; int idx; - bool pending_tick; - bool pending_read; - bool pending_write; int fd; int status; pn_trace_t trace; + bool pending_tick; + bool pending_read; + bool pending_write; bool closed; - pn_timestamp_t wakeup; - pn_connection_t *connection; - pn_transport_t *transport; - pn_sasl_t *sasl; bool input_done; bool output_done; - pn_listener_t *listener; - void *context; }; /* Impls */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/io.c ---------------------------------------------------------------------- diff --git a/proton-c/src/posix/io.c b/proton-c/src/posix/io.c index 11379ff..4fa223f 100644 --- a/proton-c/src/posix/io.c +++ b/proton-c/src/posix/io.c @@ -21,6 +21,7 @@ #include <proton/io.h> #include <proton/object.h> +#include <proton/selector.h> #include <ctype.h> #include <errno.h> @@ -43,6 +44,7 @@ struct pn_io_t { char host[MAX_HOST]; char serv[MAX_SERV]; pn_error_t *error; + pn_selector_t *selector; bool wouldblock; }; @@ -51,6 +53,7 @@ void pn_io_initialize(void *obj) pn_io_t *io = (pn_io_t *) obj; io->error = pn_error(); io->wouldblock = false; + io->selector = NULL; } void pn_io_finalize(void *obj) @@ -65,7 +68,7 @@ void pn_io_finalize(void *obj) pn_io_t *pn_io(void) { - static pn_class_t clazz = PN_CLASS(pn_io); + static const pn_class_t clazz = PN_CLASS(pn_io); pn_io_t *io = (pn_io_t *) pn_new(sizeof(pn_io_t), &clazz); return io; } @@ -275,3 +278,10 @@ bool pn_wouldblock(pn_io_t *io) { return io->wouldblock; } + +pn_selector_t *pn_io_selector(pn_io_t *io) +{ + if (io->selector == NULL) + io->selector = pni_selector(); + return io->selector; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/posix/selector.c ---------------------------------------------------------------------- diff --git a/proton-c/src/posix/selector.c b/proton-c/src/posix/selector.c index 1c42d99..14a97ee 100644 --- a/proton-c/src/posix/selector.c +++ b/proton-c/src/posix/selector.c @@ -65,9 +65,9 @@ void pn_selector_finalize(void *obj) #define pn_selector_compare NULL #define pn_selector_inspect NULL -pn_selector_t *pn_selector(void) +pn_selector_t *pni_selector(void) { - static pn_class_t clazz = PN_CLASS(pn_selector); + static const pn_class_t clazz = PN_CLASS(pn_selector); pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz); return selector; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/protocol.h.py ---------------------------------------------------------------------- diff --git a/proton-c/src/protocol.h.py b/proton-c/src/protocol.h.py index 0ff7aa4..76a2391 100644 --- a/proton-c/src/protocol.h.py +++ b/proton-c/src/protocol.h.py @@ -24,6 +24,7 @@ print "/* generated */" print "#ifndef _PROTON_PROTOCOL_H" print "#define _PROTON_PROTOCOL_H 1" print +print "#include \"proton/type_compat.h\"" fields = {} @@ -45,36 +46,99 @@ for type in TYPES: fields[code] = (type["@name"], [f["@name"] for f in type.query["field"]]) idx += 1 -print +print """ +#include <stddef.h> + +typedef struct { + const unsigned char name_index; + const unsigned char first_field_index; + const unsigned char field_count; +} pn_fields_t; -print """typedef struct { - const char *name; - const char *fields[32]; -} pn_fields_t;""" +extern const pn_fields_t FIELDS[]; +extern const char * const FIELD_STRINGPOOL; +extern const uint16_t FIELD_NAME[]; +extern const uint16_t FIELD_FIELDS[]; +extern const unsigned char FIELD_MIN; +extern const unsigned char FIELD_MAX; +""" +print "#ifdef DEFINE_FIELDS" + +print 'struct FIELD_STRINGS {' +print ' const char FIELD_STRINGS_NULL[sizeof("")];' +strings = set() +for name, fnames in fields.values(): + strings.add(name) + strings.update(fnames) +for str in strings: + istr = str.replace("-", "_"); + print ' const char FIELD_STRINGS_%s[sizeof("%s")];' % (istr, str) +print "};" +print +print 'const struct FIELD_STRINGS FIELD_STRINGS = {' +print ' "",' +for str in strings: + print ' "%s",'% str +print "};" +print 'const char * const FIELD_STRINGPOOL = (const char * const) &FIELD_STRINGS;' print +print "/* This is an array of offsets into FIELD_STRINGPOOL */" +print "const uint16_t FIELD_NAME[] = {" +print " offsetof(struct FIELD_STRINGS, FIELD_STRINGS_NULL)," +index = 1 +for i in range(256): + if i in fields: + name, fnames = fields[i] + iname = name.replace("-", "_"); + print ' offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d */' % (iname, index) + index += 1 +print "};" -print "#ifndef DEFINE_FIELDS" -print "extern" -print "#endif" +print "/* This is an array of offsets into FIELD_STRINGPOOL */" +print "const uint16_t FIELD_FIELDS[] = {" +print " offsetof(struct FIELD_STRINGS, FIELD_STRINGS_NULL)," +index = 1 +for i in range(256): + if i in fields: + name, fnames = fields[i] + if fnames: + for f in fnames: + ifname = f.replace("-", "_"); + print ' offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d (%s) */' % (ifname, index, name) + index += 1 +print "};" -print "pn_fields_t FIELDS[256]" -print "#ifdef DEFINE_FIELDS" -print " = {" +print "const pn_fields_t FIELDS[] = {" +name_count = 1 +field_count = 1 +field_min = 256 +field_max = 0 for i in range(256): if i in fields: + if i>field_max: field_max = i + if i<field_min: field_min = i + +for i in range(field_min, field_max+1): + if i in fields: name, fnames = fields[i] if fnames: - print ' {"%s", {%s}},' % (name, ", ".join(['"%s"' % f for f in fnames])) + print ' {%d, %d, %d}, /* %d (%s) */' % (name_count, field_count, len(fnames), i, name) + field_count += len(fnames) else: - print ' {"%s", {NULL}},' % name + print ' {%d, 0, 0}, /* %d (%s) */' % (name_count, i, name) + name_count += 1 + if i>field_max: field_max = i + if i<field_min: field_min = i else: - print ' {NULL, {NULL}},' + print ' {0, 0, 0}, /* %d */' % i -print "}" +print "};" +print +print 'const unsigned char FIELD_MIN = %d;' % field_min +print 'const unsigned char FIELD_MAX = %d;' % field_max +print print "#endif" -print ";" - print print "#endif /* protocol.h */" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/proton-dump.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proton-dump.c b/proton-c/src/proton-dump.c index c7216c3..4e8a04f 100644 --- a/proton-c/src/proton-dump.c +++ b/proton-c/src/proton-dump.c @@ -19,6 +19,8 @@ * */ +#include "pncompat/misc_funcs.inc" + #include <stdio.h> #include <proton/buffer.h> #include <proton/codec.h> @@ -98,8 +100,34 @@ int dump(const char *file) return 0; } +void usage(char* prog) { + printf("Usage: %s [FILE1] [FILEn] ...\n", prog); + printf("Displays the content of an AMQP dump file containing frame data.\n"); + printf("\n [FILEn] Dump file to be displayed.\n\n"); +} + int main(int argc, char **argv) { + if(argc == 1) { + usage(argv[0]); + return 0; + } + + int c; + + while ( (c = getopt(argc, argv, "h")) != -1 ) { + switch(c) { + case 'h': + usage(argv[0]); + return 0; + break; + + case '?': + usage(argv[0]); + return 1; + } + } + for (int i = 1; i < argc; i++) { int err = dump(argv[i]); if (err) return err; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/proton.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proton.c b/proton-c/src/proton.c index 9cd44ef..2b7d313 100644 --- a/proton-c/src/proton.c +++ b/proton-c/src/proton.c @@ -22,12 +22,12 @@ #if defined(_WIN32) && ! defined(__CYGWIN__) #define NOGDI #include <winsock2.h> -#include "pncompat/misc_funcs.inc" #else #include <unistd.h> -#include <libgen.h> #endif +#include "pncompat/misc_funcs.inc" + #include <stdio.h> #include <string.h> #include <proton/driver.h> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index 3f4f536..9c7ba1e 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -19,6 +19,7 @@ * */ +#include <assert.h> #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -27,29 +28,28 @@ #include <proton/engine.h> // XXX: just needed for PN_EOS #include <proton/sasl.h> #include "protocol.h" +#include "dispatch_actions.h" #include "../dispatcher/dispatcher.h" #include "../engine/engine-internal.h" #include "../util.h" -#define SCRATCH (1024) struct pn_sasl_t { pn_transport_t *transport; pn_io_layer_t *io_layer; - size_t header_count; pn_dispatcher_t *disp; - bool client; - bool configured; char *mechanisms; char *remote_mechanisms; pn_buffer_t *send_data; pn_buffer_t *recv_data; pn_sasl_outcome_t outcome; + bool client; + bool configured; + bool allow_skip; bool sent_init; bool rcvd_init; bool sent_done; bool rcvd_done; - char scratch[SCRATCH]; }; static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available); @@ -57,12 +57,6 @@ static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, si static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available); static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available); -int pn_do_init(pn_dispatcher_t *disp); -int pn_do_mechanisms(pn_dispatcher_t *disp); -int pn_do_challenge(pn_dispatcher_t *disp); -int pn_do_response(pn_dispatcher_t *disp); -int pn_do_outcome(pn_dispatcher_t *disp); - pn_sasl_t *pn_sasl(pn_transport_t *transport) { if (!transport->sasl) { @@ -70,12 +64,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) sasl->disp = pn_dispatcher(1, transport); sasl->disp->batch = false; - pn_dispatcher_action(sasl->disp, SASL_INIT, pn_do_init); - pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, pn_do_mechanisms); - pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, pn_do_challenge); - pn_dispatcher_action(sasl->disp, SASL_RESPONSE, pn_do_response); - pn_dispatcher_action(sasl->disp, SASL_OUTCOME, pn_do_outcome); - sasl->client = false; sasl->configured = false; sasl->mechanisms = NULL; @@ -83,6 +71,7 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) sasl->send_data = pn_buffer(16); sasl->recv_data = pn_buffer(16); sasl->outcome = PN_SASL_NONE; + sasl->allow_skip = false; sasl->sent_init = false; sasl->rcvd_init = false; sasl->sent_done = false; @@ -95,8 +84,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) sasl->io_layer->process_input = pn_input_read_sasl_header; sasl->io_layer->process_output = pn_output_write_sasl_header; sasl->io_layer->process_tick = pn_io_layer_tick_passthru; - - sasl->header_count = 0; } return transport->sasl; @@ -185,6 +172,12 @@ void pn_sasl_server(pn_sasl_t *sasl) } } +void pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow) +{ + if (sasl) + sasl->allow_skip = allow; +} + void pn_sasl_plain(pn_sasl_t *sasl, const char *username, const char *password) { if (!sasl) return; @@ -238,7 +231,7 @@ void pn_sasl_free(pn_sasl_t *sasl) void pn_client_init(pn_sasl_t *sasl) { - pn_bytes_t bytes = pn_buffer_bytes(sasl->send_data); + pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data); pn_post_frame(sasl->disp, 0, "DL[sz]", SASL_INIT, sasl->mechanisms, bytes.size, bytes.start); pn_buffer_clear(sasl->send_data); @@ -294,7 +287,7 @@ void pn_sasl_process(pn_sasl_t *sasl) } if (pn_buffer_size(sasl->send_data)) { - pn_bytes_t bytes = pn_buffer_bytes(sasl->send_data); + pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data); pn_post_frame(sasl->disp, 0, "DL[z]", sasl->client ? SASL_RESPONSE : SASL_CHALLENGE, bytes.size, bytes.start); pn_buffer_clear(sasl->send_data); @@ -408,27 +401,44 @@ int pn_do_outcome(pn_dispatcher_t *disp) } #define SASL_HEADER ("AMQP\x03\x01\x00\x00") +#define AMQP_HEADER ("AMQP\x00\x01\x00\x00") #define SASL_HEADER_LEN 8 static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available) { pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; - const char *point = SASL_HEADER + sasl->header_count; - int delta = pn_min(available, SASL_HEADER_LEN - sasl->header_count); - if (!available || memcmp(bytes, point, delta)) { - char quoted[1024]; - pn_quote_data(quoted, 1024, bytes, available); - return pn_error_format(sasl->transport->error, PN_ERR, - "%s header mismatch: '%s'", "SASL", quoted); - } else { - sasl->header_count += delta; - if (sasl->header_count == SASL_HEADER_LEN) { - sasl->io_layer->process_input = pn_input_read_sasl; - if (sasl->disp->trace & PN_TRACE_FRM) - pn_transport_logf(sasl->transport, " <- %s", "SASL"); + if (available > 0) { + if (available < SASL_HEADER_LEN) { + if (memcmp(bytes, SASL_HEADER, available) == 0 || + memcmp(bytes, AMQP_HEADER, available) == 0) + return 0; + } else { + if (memcmp(bytes, SASL_HEADER, SASL_HEADER_LEN) == 0) { + sasl->io_layer->process_input = pn_input_read_sasl; + if (sasl->disp->trace & PN_TRACE_FRM) + pn_transport_logf(sasl->transport, " <- %s", "SASL"); + return SASL_HEADER_LEN; + } + if (memcmp(bytes, AMQP_HEADER, SASL_HEADER_LEN) == 0) { + if (sasl->allow_skip) { + sasl->outcome = PN_SASL_SKIPPED; + sasl->io_layer->process_input = pn_io_layer_input_passthru; + sasl->io_layer->process_output = pn_io_layer_output_passthru; + pn_io_layer_t *io_next = sasl->io_layer->next; + return io_next->process_input( io_next, bytes, available ); + } else { + pn_do_error(sasl->transport, "amqp:connection:policy-error", + "Client skipped SASL exchange - forbidden"); + return PN_EOS; + } + } } - return delta; } + char quoted[1024]; + pn_quote_data(quoted, 1024, bytes, available); + pn_do_error(sasl->transport, "amqp:connection:framing-error", + "%s header mismatch: '%s'", "SASL", quoted); + return PN_EOS; } static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available) @@ -448,13 +458,10 @@ static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; if (sasl->disp->trace & PN_TRACE_FRM) pn_transport_logf(sasl->transport, " -> %s", "SASL"); - if (size >= SASL_HEADER_LEN) { - memmove(bytes, SASL_HEADER, SASL_HEADER_LEN); - sasl->io_layer->process_output = pn_output_write_sasl; - return SASL_HEADER_LEN; - } else { - return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow writing %s header", "SASL"); - } + assert(size >= SASL_HEADER_LEN); + memmove(bytes, SASL_HEADER, SASL_HEADER_LEN); + sasl->io_layer->process_output = pn_output_write_sasl; + return SASL_HEADER_LEN; } static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/selectable.c ---------------------------------------------------------------------- diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c index 97283b4..1f39a38 100644 --- a/proton-c/src/selectable.c +++ b/proton-c/src/selectable.c @@ -90,7 +90,7 @@ pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *), void (*expired)(pn_selectable_t *), void (*finalize)(pn_selectable_t *)) { - static pn_class_t clazz = PN_CLASS(pn_selectable); + static const pn_class_t clazz = PN_CLASS(pn_selectable); pn_selectable_t *selectable = (pn_selectable_t *) pn_new(sizeof(pn_selectable_t), &clazz); selectable->capacity = capacity; selectable->pending = pending; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/ssl/openssl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c index 5815845..db3af4c 100644 --- a/proton-c/src/ssl/openssl.c +++ b/proton-c/src/ssl/openssl.c @@ -49,23 +49,25 @@ typedef enum { UNKNOWN_CONNECTION, SSL_CONNECTION, CLEAR_CONNECTION } connection typedef struct pn_ssl_session_t pn_ssl_session_t; struct pn_ssl_domain_t { - int ref_count; SSL_CTX *ctx; - pn_ssl_mode_t mode; - bool has_ca_db; // true when CA database configured - bool has_certificate; // true when certificate configured char *keyfile_pw; // settings used for all connections char *trusted_CAs; - pn_ssl_verify_mode_t verify_mode; - bool allow_unsecured; // session cache pn_ssl_session_t *ssn_cache_head; pn_ssl_session_t *ssn_cache_tail; + + int ref_count; + pn_ssl_mode_t mode; + pn_ssl_verify_mode_t verify_mode; + + bool has_ca_db; // true when CA database configured + bool has_certificate; // true when certificate configured + bool allow_unsecured; }; @@ -81,24 +83,25 @@ struct pn_ssl_t { BIO *bio_ssl; // i/o from/to SSL socket layer BIO *bio_ssl_io; // SSL "half" of network-facing BIO BIO *bio_net_io; // socket-side "half" of network-facing BIO - bool ssl_shutdown; // BIO_ssl_shutdown() called on socket. - bool ssl_closed; // shutdown complete, or SSL error - ssize_t app_input_closed; // error code returned by upper layer process input - ssize_t app_output_closed; // error code returned by upper layer process output - - bool read_blocked; // SSL blocked until more network data is read - bool write_blocked; // SSL blocked until data is written to network - // buffers for holding I/O from "applications" above SSL #define APP_BUF_SIZE (4*1024) char *outbuf; + char *inbuf; + + ssize_t app_input_closed; // error code returned by upper layer process input + ssize_t app_output_closed; // error code returned by upper layer process output + size_t out_size; size_t out_count; - char *inbuf; size_t in_size; size_t in_count; pn_trace_t trace; + + bool ssl_shutdown; // BIO_ssl_shutdown() called on socket. + bool ssl_closed; // shutdown complete, or SSL error + bool read_blocked; // SSL blocked until more network data is read + bool write_blocked; // SSL blocked until data is written to network }; struct pn_ssl_session_t { @@ -184,7 +187,7 @@ static int ssl_failed(pn_ssl_t *ssl) { SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); ssl->ssl_closed = true; - ssl->app_input_closed = ssl->app_output_closed = PN_ERR; + ssl->app_input_closed = ssl->app_output_closed = PN_EOS; // fake a shutdown so the i/o processing code will close properly SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); // try to grab the first SSL error to add to the failure log @@ -195,7 +198,8 @@ static int ssl_failed(pn_ssl_t *ssl) } _log_ssl_error(NULL); // spit out any remaining errors to the log file ssl->transport->tail_closed = true; - return pn_error_format( ssl->transport->error, PN_ERR, "SSL Failure: %s", buf ); + pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf); + return PN_EOS; } /* match the DNS name pattern from the peer certificate against our configured peer @@ -343,7 +347,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) // "openssl dhparam -C -2 2048" static DH *get_dh2048(void) { - static unsigned char dh2048_p[]={ + static const unsigned char dh2048_p[]={ 0xAE,0xF7,0xE9,0x66,0x26,0x7A,0xAC,0x0A,0x6F,0x1E,0xCD,0x81, 0xBD,0x0A,0x10,0x7E,0xFA,0x2C,0xF5,0x2D,0x98,0xD4,0xE7,0xD9, 0xE4,0x04,0x8B,0x06,0x85,0xF2,0x0B,0xA3,0x90,0x15,0x56,0x0C, @@ -367,7 +371,7 @@ static DH *get_dh2048(void) 0xA4,0xED,0xFD,0x49,0x0B,0xE3,0x4A,0xF6,0x28,0xB3,0x98,0xB0, 0x23,0x1C,0x09,0x33, }; - static unsigned char dh2048_g[]={ + static const unsigned char dh2048_g[]={ 0x02, }; DH *dh; @@ -807,7 +811,7 @@ static int setup_ssl_connection( pn_ssl_t *ssl ) static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available) { pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; - if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR; + if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS; _log( ssl, "process_input_ssl( data size=%d )\n",available ); @@ -904,14 +908,13 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat if (!max_frame) max_frame = ssl->in_size * 2; // no limit if (ssl->in_size < max_frame) { // no max frame limit - grow it. - char *newbuf = (char *)malloc( max_frame ); + size_t newsize = pn_min(max_frame, ssl->in_size * 2); + char *newbuf = (char *)realloc( ssl->inbuf, newsize ); if (newbuf) { - ssl->in_size = max_frame; - memmove( newbuf, ssl->inbuf, ssl->in_count ); - free( ssl->inbuf ); + ssl->in_size = newsize; ssl->inbuf = newbuf; + work_pending = true; // can we get more input? } - work_pending = true; // can we get more input? } else { // can't gather any more input, but app needs more? // This is a bug - since SSL can buffer up to max-frame, @@ -951,8 +954,8 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len) { pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; - if (!ssl) return PN_ERR; - if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR; + if (!ssl) return PN_EOS; + if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS; ssize_t written = 0; bool work_pending; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/tests/object.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/object.c b/proton-c/src/tests/object.c index 228370f..f6d11cf 100644 --- a/proton-c/src/tests/object.c +++ b/proton-c/src/tests/object.c @@ -95,7 +95,7 @@ static intptr_t delta(void *a, void *b) { return (uintptr_t) b - (uintptr_t) a; static pn_class_t null_class = {0}; -static pn_class_t noop_class = {noop, noop, zero, delta}; +static pn_class_t noop_class = {NULL, noop, noop, zero, delta}; static void test_new(size_t size, pn_class_t *clazz) { @@ -119,7 +119,7 @@ static void finalizer(void *object) static void test_finalize(void) { - static pn_class_t clazz = {NULL, finalizer}; + static pn_class_t clazz = {NULL, NULL, finalizer}; int **obj = (int **) pn_new(sizeof(int **), &clazz); assert(obj); @@ -141,7 +141,7 @@ static uintptr_t hashcode(void *obj) { return (uintptr_t) obj; } static void test_hashcode(void) { - static pn_class_t clazz = {NULL, NULL, hashcode}; + static pn_class_t clazz = {NULL, NULL, NULL, hashcode}; void *obj = pn_new(0, &clazz); assert(obj); assert(pn_hashcode(obj) == (uintptr_t) obj); @@ -151,7 +151,7 @@ static void test_hashcode(void) static void test_compare(void) { - static pn_class_t clazz = {NULL, NULL, NULL, delta}; + static pn_class_t clazz = {NULL, NULL, NULL, NULL, delta}; void *a = pn_new(0, &clazz); assert(a); @@ -499,6 +499,50 @@ static void test_hash(void) pn_decref(three); } + +// collider class: all objects have same hash, no two objects compare equal +static intptr_t collider_compare(void *a, void *b) +{ + if (a == b) return 0; + return (a > b) ? 1 : -1; +} + +static uintptr_t collider_hashcode(void *obj) +{ + return 23; +} + +#define collider_initialize NULL +#define collider_finalize NULL +#define collider_inspect NULL + +static void test_map_links(void) +{ + const pn_class_t collider_clazz = PN_CLASS(collider); + void *keys[3]; + for (int i = 0; i < 3; i++) + keys[i] = pn_new(0, &collider_clazz); + + // test deleting a head, middle link, tail + + for (int delete_idx=0; delete_idx < 3; delete_idx++) { + pn_map_t *map = pn_map(0, 0.75, 0); + // create a chain of entries that have same head (from identical key hashcode) + for (int i = 0; i < 3; i++) { + pn_map_put(map, keys[i], keys[i]); + } + pn_map_del(map, keys[delete_idx]); + for (int i = 0; i < 3; i++) { + void *value = (i == delete_idx) ? NULL : keys[i]; + assert (pn_map_get(map, keys[i]) == value); + } + pn_free(map); + } + for (int i = 0; i < 3; i++) + pn_free(keys[i]); +} + + static bool equals(const char *a, const char *b) { if (a == NULL && b == NULL) { @@ -792,6 +836,7 @@ int main(int argc, char **argv) test_list_index(); test_map(); + test_map_links(); test_hash(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/tests/parse-url.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/parse-url.c b/proton-c/src/tests/parse-url.c index 4489ab2..f57e739 100644 --- a/proton-c/src/tests/parse-url.c +++ b/proton-c/src/tests/parse-url.c @@ -64,6 +64,8 @@ static bool test_url_parse(const char* url0, const char* scheme0, const char* us int main(int argc, char **argv) { + assert(test_url_parse("", 0, 0, 0, "", 0, 0)); + assert(test_url_parse("/Foo.bar:90087@somewhere", 0, 0, 0, "", 0, "Foo.bar:90087@somewhere")); assert(test_url_parse("host", 0, 0, 0, "host", 0, 0)); assert(test_url_parse("host:423", 0, 0, 0, "host", "423", 0)); assert(test_url_parse("user@host", 0, "user", 0, "host", 0, 0)); @@ -96,5 +98,10 @@ int main(int argc, char **argv) assert(test_url_parse("us%2fer:password@host", 0, "us/er", "password", "host", 0, 0)); assert(test_url_parse("us%2Fer:password@host", 0, "us/er", "password", "host", 0, 0)); assert(test_url_parse("user:pass%2fword%@host", 0, "user", "pass/word%", "host", 0, 0)); + assert(test_url_parse("localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0")); + assert(test_url_parse("/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, "", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0")); + assert(test_url_parse("amqp://localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", "amqp", 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0")); + // Really perverse url + assert(test_url_parse("://:@://:", "", "", "", "", "", "/:")); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index e5b4a31..1ac2876 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -24,6 +24,7 @@ #include <string.h> #include <proton/framing.h> #include "protocol.h" +#include "dispatch_actions.h" #include <assert.h> #include <stdarg.h> @@ -42,7 +43,7 @@ static ssize_t transport_consume(pn_transport_t *transport); void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next) { - db->deliveries = pn_hash(1024, 0.75, PN_REFCOUNT); + db->deliveries = pn_hash(0, 0.75, PN_REFCOUNT); db->next = next; } @@ -92,16 +93,6 @@ void pn_delivery_map_clear(pn_delivery_map_t *dm) } } -int pn_do_open(pn_dispatcher_t *disp); -int pn_do_begin(pn_dispatcher_t *disp); -int pn_do_attach(pn_dispatcher_t *disp); -int pn_do_transfer(pn_dispatcher_t *disp); -int pn_do_flow(pn_dispatcher_t *disp); -int pn_do_disposition(pn_dispatcher_t *disp); -int pn_do_detach(pn_dispatcher_t *disp); -int pn_do_end(pn_dispatcher_t *disp); -int pn_do_close(pn_dispatcher_t *disp); - static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available); static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available); static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available); @@ -150,16 +141,6 @@ static void pn_transport_initialize(void *object) amqp->buffered_input = NULL; amqp->next = NULL; - pn_dispatcher_action(transport->disp, OPEN, pn_do_open); - pn_dispatcher_action(transport->disp, BEGIN, pn_do_begin); - pn_dispatcher_action(transport->disp, ATTACH, pn_do_attach); - pn_dispatcher_action(transport->disp, TRANSFER, pn_do_transfer); - pn_dispatcher_action(transport->disp, FLOW, pn_do_flow); - pn_dispatcher_action(transport->disp, DISPOSITION, pn_do_disposition); - pn_dispatcher_action(transport->disp, DETACH, pn_do_detach); - pn_dispatcher_action(transport->disp, END, pn_do_end); - pn_dispatcher_action(transport->disp, CLOSE, pn_do_close); - transport->open_sent = false; transport->open_rcvd = false; transport->close_sent = false; @@ -178,11 +159,10 @@ static void pn_transport_initialize(void *object) transport->remote_idle_timeout = 0; transport->keepalive_deadline = 0; transport->last_bytes_output = 0; - transport->remote_offered_capabilities = pn_data(16); - transport->remote_desired_capabilities = pn_data(16); - transport->remote_properties = pn_data(16); - transport->disp_data = pn_data(16); - transport->error = pn_error(); + transport->remote_offered_capabilities = pn_data(0); + transport->remote_desired_capabilities = pn_data(0); + transport->remote_properties = pn_data(0); + transport->disp_data = pn_data(0); pn_condition_init(&transport->remote_condition); transport->local_channels = pn_hash(0, 0.75, PN_REFCOUNT); @@ -193,6 +173,8 @@ static void pn_transport_initialize(void *object) transport->input_pending = 0; transport->output_pending = 0; + + transport->done_processing = false; } pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel) @@ -200,14 +182,20 @@ pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel) return (pn_session_t *) pn_hash_get(transport->remote_channels, channel); } -static void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_t *session) +static void pni_map_remote_channel(pn_session_t *session, uint16_t channel) { + pn_transport_t *transport = session->connection->transport; pn_hash_put(transport->remote_channels, channel, session); session->state.remote_channel = channel; } -void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn) +void pni_transport_unbind_handles(pn_hash_t *handles); + +static void pni_unmap_remote_channel(pn_session_t *ssn) { + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.remote_handles); + pn_transport_t *transport = ssn->connection->transport; uint16_t channel = ssn->state.remote_channel; ssn->state.remote_channel = -2; // note: may free the session: @@ -222,7 +210,7 @@ static void pn_transport_finalize(void *object); pn_transport_t *pn_transport() { - static pn_class_t clazz = PN_CLASS(pn_transport); + static const pn_class_t clazz = PN_CLASS(pn_transport); pn_transport_t *transport = (pn_transport_t *) pn_new(sizeof(pn_transport_t), &clazz); if (!transport) return NULL; @@ -265,7 +253,6 @@ static void pn_transport_finalize(void *object) pn_free(transport->remote_desired_capabilities); pn_free(transport->remote_properties); pn_free(transport->disp_data); - pn_error_free(transport->error); pn_condition_tini(&transport->remote_condition); pn_free(transport->local_channels); pn_free(transport->remote_channels); @@ -281,22 +268,35 @@ int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) if (connection->transport) return PN_STATE_ERR; transport->connection = connection; connection->transport = transport; - pn_incref(connection); + pn_incref2(connection, transport); if (transport->open_rcvd) { PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); - pn_event_t *event = pn_collector_put(connection->collector, - PN_CONNECTION_REMOTE_STATE); - if (event) { - pn_event_init_connection(event, connection); - } - if (!pn_error_code(transport->error)) { - transport->disp->halt = false; - transport_consume(transport); // blech - testBindAfterOpen - } + pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_OPEN, connection); + transport->disp->halt = false; + transport_consume(transport); // blech - testBindAfterOpen } return 0; } +void pni_transport_unbind_handles(pn_hash_t *handles) +{ + for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) { + uintptr_t key = pn_hash_key(handles, h); + pn_hash_del(handles, key); + } +} + +void pni_transport_unbind_channels(pn_hash_t *channels) +{ + for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) { + uintptr_t key = pn_hash_key(channels, h); + pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h); + pni_transport_unbind_handles(ssn->state.local_handles); + pni_transport_unbind_handles(ssn->state.remote_handles); + pn_hash_del(channels, key); + } +} + int pn_transport_unbind(pn_transport_t *transport) { assert(transport); @@ -305,6 +305,7 @@ int pn_transport_unbind(pn_transport_t *transport) pn_connection_t *conn = transport->connection; transport->connection = NULL; + // XXX: what happens if the endpoints are freed before we get here? pn_session_t *ssn = pn_session_head(conn, 0); while (ssn) { pn_delivery_map_clear(&ssn->state.incoming); @@ -319,28 +320,31 @@ int pn_transport_unbind(pn_transport_t *transport) endpoint = endpoint->endpoint_next; } + pni_transport_unbind_channels(transport->local_channels); + pni_transport_unbind_channels(transport->remote_channels); + pn_connection_unbound(conn); - pn_decref(conn); + pn_decref2(conn, transport); return 0; } pn_error_t *pn_transport_error(pn_transport_t *transport) { - return transport->error; + return NULL; } -static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link) +static void pni_map_remote_handle(pn_link_t *link, uint32_t handle) { link->state.remote_handle = handle; - pn_hash_put(ssn->state.remote_handles, handle, link); + pn_hash_put(link->session->state.remote_handles, handle, link); } -void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link) +static void pni_unmap_remote_handle(pn_link_t *link) { - uint32_t handle = link->state.remote_handle; + uintptr_t handle = link->state.remote_handle; link->state.remote_handle = -2; // may delete link: - pn_hash_del(ssn->state.remote_handles, handle); + pn_hash_del(link->session->state.remote_handles, handle); } pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle) @@ -392,13 +396,12 @@ void pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data) } } -int pn_post_close(pn_transport_t *transport, const char *condition) +int pn_post_close(pn_transport_t *transport, const char *condition, const char *description) { pn_condition_t *cond = NULL; if (transport->connection) { cond = pn_connection_condition(transport->connection); } - const char *description = NULL; pn_data_t *info = NULL; if (!condition && pn_condition_is_set(cond)) { condition = pn_condition_get_name(cond); @@ -418,13 +421,16 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm // XXX: result vsnprintf(buf, 1024, fmt, ap); va_end(ap); - pn_error_set(transport->error, PN_ERR, buf); if (!transport->close_sent) { - pn_post_close(transport, condition); + if (!transport->open_sent) { + pn_post_frame(transport->disp, 0, "DL[S]", OPEN, ""); + } + + pn_post_close(transport, condition, buf); transport->close_sent = true; } transport->disp->halt = true; - pn_transport_logf(transport, "ERROR %s %s", condition, pn_error_text(transport->error)); + pn_transport_logf(transport, "ERROR %s %s", condition, buf); return PN_ERR; } @@ -459,7 +465,6 @@ int pn_do_open(pn_dispatcher_t *disp) } disp->remote_max_frame = transport->remote_max_frame; pn_buffer_clear( disp->frame ); - pn_buffer_ensure( disp->frame, disp->remote_max_frame ); } if (container_q) { transport->remote_container = pn_bytes_strdup(remote_container); @@ -474,12 +479,7 @@ int pn_do_open(pn_dispatcher_t *disp) if (conn) { PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); - - pn_event_t *event = pn_collector_put(conn->collector, - PN_CONNECTION_REMOTE_STATE); - if (event) { - pn_event_init_connection(event, conn); - } + pn_collector_put(conn->collector, PN_CONNECTION_REMOTE_OPEN, conn); } else { transport->disp->halt = true; } @@ -506,15 +506,9 @@ int pn_do_begin(pn_dispatcher_t *disp) ssn = pn_session(transport->connection); } ssn->state.incoming_transfer_count = next; - pn_map_channel(transport, disp->channel, ssn); + pni_map_remote_channel(ssn, disp->channel); PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE); - - pn_event_t *event = pn_collector_put(transport->connection->collector, - PN_SESSION_REMOTE_STATE); - if (event) { - pn_event_init_session(event, ssn); - } - + pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, ssn); return 0; } @@ -537,18 +531,18 @@ pn_link_t *pn_find_link(pn_session_t *ssn, pn_bytes_t name, bool is_sender) static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol) { if (!symbol.start) - return PN_SESSION_CLOSE; + return PN_EXPIRE_WITH_SESSION; if (!strncmp(symbol.start, "link-detach", symbol.size)) - return PN_LINK_CLOSE; + return PN_EXPIRE_WITH_LINK; if (!strncmp(symbol.start, "session-end", symbol.size)) - return PN_SESSION_CLOSE; + return PN_EXPIRE_WITH_SESSION; if (!strncmp(symbol.start, "connection-close", symbol.size)) - return PN_CONNECTION_CLOSE; + return PN_EXPIRE_WITH_CONNECTION; if (!strncmp(symbol.start, "never", symbol.size)) - return PN_NEVER; + return PN_EXPIRE_NEVER; - return PN_SESSION_CLOSE; + return PN_EXPIRE_WITH_SESSION; } static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol) @@ -613,6 +607,10 @@ int pn_do_attach(pn_dispatcher_t *disp) strname[name.size] = '\0'; pn_session_t *ssn = pn_channel_state(transport, disp->channel); + if (!ssn) { + pn_do_error(transport, "amqp:connection:no-session", "attach without a session"); + return PN_EOS; + } pn_link_t *link = pn_find_link(ssn, name, is_sender); if (!link) { if (is_sender) { @@ -626,7 +624,7 @@ int pn_do_attach(pn_dispatcher_t *disp) free(strheap); } - pn_map_handle(ssn, handle, link); + pni_map_remote_handle(link, handle); PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE); pn_terminus_t *rsrc = &link->remote_source; if (source.start || src_dynamic) { @@ -684,12 +682,7 @@ int pn_do_attach(pn_dispatcher_t *disp) link->state.delivery_count = idc; } - pn_event_t *event = pn_collector_put(transport->connection->collector, - PN_LINK_REMOTE_STATE); - if (event) { - pn_event_init_link(event, link); - } - + pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_OPEN, link); return 0; } @@ -770,11 +763,7 @@ int pn_do_transfer(pn_dispatcher_t *disp) pn_post_flow(transport, ssn, link); } - pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY); - if (event) { - pn_event_init_delivery(event, delivery); - } - + pn_collector_put(transport->connection->collector, PN_DELIVERY, delivery); return 0; } @@ -824,10 +813,7 @@ int pn_do_flow(pn_dispatcher_t *disp) } } - pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_FLOW); - if (event) { - pn_event_init_link(event, link); - } + pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link); } return 0; @@ -922,10 +908,7 @@ int pn_do_disposition(pn_dispatcher_t *disp) delivery->updated = true; pn_work_update(transport->connection, delivery); - pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY); - if (event) { - pn_event_init_delivery(event, delivery); - } + pn_collector_put(transport->connection->collector, PN_DELIVERY, delivery); } } @@ -955,16 +938,12 @@ int pn_do_detach(pn_dispatcher_t *disp) if (closed) { PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED); - pn_event_t *event = pn_collector_put(transport->connection->collector, - PN_LINK_REMOTE_STATE); - if (event) { - pn_event_init_link(event, link); - } + pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_CLOSE, link); } else { // TODO: implement } - pn_unmap_handle(ssn, link); + pni_unmap_remote_handle(link); return 0; } @@ -975,12 +954,8 @@ int pn_do_end(pn_dispatcher_t *disp) int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); - pn_event_t *event = pn_collector_put(transport->connection->collector, - PN_SESSION_REMOTE_STATE); - if (event) { - pn_event_init_session(event, ssn); - } - pn_unmap_channel(transport, ssn); + pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, ssn); + pni_unmap_remote_channel(ssn); return 0; } @@ -992,11 +967,7 @@ int pn_do_close(pn_dispatcher_t *disp) if (err) return err; transport->close_rcvd = true; PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); - pn_event_t *event = pn_collector_put(transport->connection->collector, - PN_CONNECTION_REMOTE_STATE); - if (event) { - pn_event_init_connection(event, conn); - } + pn_collector_put(transport->connection->collector, PN_CONNECTION_REMOTE_CLOSE, conn); return 0; } @@ -1043,11 +1014,7 @@ static ssize_t transport_consume(pn_transport_t *transport) } else if (n == 0) { break; } else { - if (n != PN_EOS) { - pn_transport_logf(transport, "ERROR[%i] %s\n", - pn_error_code(transport->error), - pn_error_text(transport->error)); - } + assert(n == PN_EOS); if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) pn_transport_log(transport, " <- EOS"); transport->input_pending = 0; // XXX ??? @@ -1071,9 +1038,10 @@ static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes if (!available || memcmp(bytes, point, delta)) { char quoted[1024]; pn_quote_data(quoted, 1024, bytes, available); - return pn_error_format(transport->error, PN_ERR, - "%s header mismatch: '%s'%s", protocol, quoted, - available ? "" : " (connection aborted)"); + pn_do_error(transport, "amqp:connection:framing-error", + "%s header mismatch: '%s'%s", protocol, quoted, + available ? "" : " (connection aborted)"); + return PN_EOS; } else { transport->header_count += delta; if (transport->header_count == size) { @@ -1102,21 +1070,20 @@ static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, si if (transport->close_rcvd) { if (available > 0) { pn_do_error(transport, "amqp:connection:framing-error", "data after close"); - return PN_ERR; - } else { return PN_EOS; } } if (!available) { pn_do_error(transport, "amqp:connection:framing-error", "connection aborted"); - return PN_ERR; + return PN_EOS; } ssize_t n = pn_dispatcher_input(transport->disp, bytes, available); if (n < 0) { - return pn_error_set(transport->error, n, "dispatch error"); + //return pn_error_set(transport->error, n, "dispatch error"); + return PN_EOS; } else if (transport->close_rcvd) { return PN_EOS; } else { @@ -1230,6 +1197,15 @@ size_t pn_session_incoming_window(pn_session_t *ssn) } } +static void pni_map_local_channel(pn_session_t *ssn) +{ + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uint16_t channel = allocate_alias(transport->local_channels); + state->local_channel = channel; + pn_hash_put(transport->local_channels, channel, ssn); +} + int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SESSION && transport->open_sent) @@ -1238,16 +1214,14 @@ int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) pn_session_state_t *state = &ssn->state; if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1) { - uint16_t channel = allocate_alias(transport->local_channels); + pni_map_local_channel(ssn); state->incoming_window = pn_session_incoming_window(ssn); state->outgoing_window = pn_session_outgoing_window(ssn); - pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN, + pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN, ((int16_t) state->remote_channel >= 0), state->remote_channel, state->outgoing_transfer_count, state->incoming_window, state->outgoing_window); - state->local_channel = channel; - pn_hash_put(transport->local_channels, channel, ssn); } } @@ -1258,18 +1232,25 @@ static const char *expiry_symbol(pn_expiry_policy_t policy) { switch (policy) { - case PN_LINK_CLOSE: + case PN_EXPIRE_WITH_LINK: return "link-detach"; - case PN_SESSION_CLOSE: + case PN_EXPIRE_WITH_SESSION: return NULL; - case PN_CONNECTION_CLOSE: + case PN_EXPIRE_WITH_CONNECTION: return "connection-close"; - case PN_NEVER: + case PN_EXPIRE_NEVER: return "never"; } return NULL; } +static void pni_map_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + pn_session_state_t *ssn_state = &link->session->state; + state->local_handle = allocate_alias(ssn_state->local_handles); + pn_hash_put(ssn_state->local_handles, state->local_handle, link); +} + int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (transport->open_sent && (endpoint->type == SENDER || @@ -1281,8 +1262,7 @@ int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) if (((int16_t) ssn_state->local_channel >= 0) && !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1) { - state->local_handle = allocate_alias(ssn_state->local_handles); - pn_hash_put(ssn_state->local_handles, state->local_handle, link); + pni_map_local_handle(link); const pn_distribution_mode_t dist_mode = link->source.distribution_mode; int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, @@ -1464,6 +1444,8 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, link->queued--; link->session->outgoing_deliveries--; } + + pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link); } } @@ -1573,6 +1555,14 @@ int pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint) return 0; } +static void pni_unmap_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + uintptr_t handle = state->local_handle; + state->local_handle = -2; + // may delete link + pn_hash_del(link->session->state.local_handles, handle); +} + int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SENDER || endpoint->type == RECEIVER) @@ -1601,8 +1591,7 @@ int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH, state->local_handle, true, (bool) name, ERROR, name, description, info); if (err) return err; - pn_hash_del(ssn_state->local_handles, state->local_handle); - state->local_handle = -2; + pni_unmap_local_handle(link); } pn_clear_modified(transport->connection, endpoint); @@ -1634,6 +1623,17 @@ bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session) return false; } +static void pni_unmap_local_channel(pn_session_t *ssn) { + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.local_handles); + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uintptr_t channel = state->local_channel; + state->local_channel = -2; + // may delete session + pn_hash_del(transport->local_channels, channel); +} + int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SESSION) @@ -1643,7 +1643,9 @@ int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0 && !transport->close_sent) { - if (pn_pointful_buffering(transport, session)) return 0; + if (pn_pointful_buffering(transport, session)) { + return 0; + } const char *name = NULL; const char *description = NULL; @@ -1658,8 +1660,7 @@ int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END, (bool) name, ERROR, name, description, info); if (err) return err; - pn_hash_del(transport->local_channels, state->local_channel); - state->local_channel = -2; + pni_unmap_local_channel(session); } pn_clear_modified(transport->connection, endpoint); @@ -1673,7 +1674,7 @@ int pn_process_conn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) { if (pn_pointful_buffering(transport, NULL)) return 0; - int err = pn_post_close(transport, NULL); + int err = pn_post_close(transport, NULL, NULL); if (err) return err; transport->close_sent = true; } @@ -1733,13 +1734,10 @@ static ssize_t pn_output_write_header(pn_transport_t *transport, { if (transport->disp->trace & PN_TRACE_FRM) pn_transport_logf(transport, " -> %s", protocol); - if (size >= hdrsize) { - memmove(bytes, header, hdrsize); - transport->io_layers[PN_IO_AMQP].process_output = next; - return hdrsize; - } else { - return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol); - } + assert(size >= hdrsize); + memmove(bytes, header, hdrsize); + transport->io_layers[PN_IO_AMQP].process_output = next; + return hdrsize; } static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size) @@ -1752,22 +1750,19 @@ static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size) { pn_transport_t *transport = (pn_transport_t *)io_layer->context; - if (!transport->connection) { - return 0; - } - - if (!pn_error_code(transport->error)) { - pn_error_set(transport->error, pn_process(transport), "process error"); + if (transport->connection && !transport->done_processing) { + int err = pn_process(transport); + if (err) { + pn_transport_logf(transport, "process error %i", err); + transport->done_processing = true; + } } - // write out any buffered data _before_ returning an error code, - // else we could truncate an outgoing Close frame containing a - // useful error status - if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) { - if (pn_error_code(transport->error)) - return pn_error_code(transport->error); - else - return PN_EOS; + // write out any buffered data _before_ returning PN_EOS, else we + // could truncate an outgoing Close frame containing a useful error + // status + if (!transport->disp->available && transport->close_sent) { + return PN_EOS; } return pn_dispatcher_output(transport->disp, bytes, size); @@ -1779,20 +1774,18 @@ static ssize_t transport_produce(pn_transport_t *transport) pn_io_layer_t *io_layer = transport->io_layers; ssize_t space = transport->output_size - transport->output_pending; - if (space == 0) { // can we expand the buffer? + if (space <= 0) { // can we expand the buffer? int more = 0; if (!transport->remote_max_frame) // no limit, so double it more = transport->output_size; else if (transport->remote_max_frame > transport->output_size) - more = transport->remote_max_frame - transport->output_size; + more = pn_min(transport->output_size, transport->remote_max_frame - transport->output_size); if (more) { - char *newbuf = (char *)malloc( transport->output_size + more ); + char *newbuf = (char *)realloc( transport->output_buf, transport->output_size + more ); if (newbuf) { - memmove( newbuf, transport->output_buf, transport->output_pending ); - free( transport->output_buf ); transport->output_buf = newbuf; transport->output_size += more; - space = more; + space += more; } } } @@ -1811,11 +1804,12 @@ static ssize_t transport_produce(pn_transport_t *transport) if (transport->output_pending) break; // return what is available if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) { - if (n == PN_EOS) + if (n < 0) { pn_transport_log(transport, " -> EOS"); - else + } + /*else pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n, - pn_error_text(transport->error)); + pn_error_text(transport->error));*/ } return n; } @@ -1993,22 +1987,20 @@ ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */ //if (pn_error_code(transport->error)) return pn_error_code(transport->error); ssize_t capacity = transport->input_size - transport->input_pending; - if (!capacity) { + if ( capacity<=0 ) { // can we expand the size of the input buffer? int more = 0; if (!transport->local_max_frame) { // no limit (ha!) more = transport->input_size; } else if (transport->local_max_frame > transport->input_size) { - more = transport->local_max_frame - transport->input_size; + more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size); } if (more) { - char *newbuf = (char *) malloc( transport->input_size + more ); + char *newbuf = (char *) realloc( transport->input_buf, transport->input_size + more ); if (newbuf) { - memmove( newbuf, transport->input_buf, transport->input_pending ); - free( transport->input_buf ); transport->input_buf = newbuf; transport->input_size += more; - capacity = more; + capacity += more; } } } @@ -2024,7 +2016,7 @@ char *pn_transport_tail(pn_transport_t *transport) return NULL; } -int pn_transport_push(pn_transport_t *transport, const char *src, size_t size) +ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t size) { assert(transport); @@ -2032,14 +2024,19 @@ int pn_transport_push(pn_transport_t *transport, const char *src, size_t size) if (capacity < 0) { return capacity; } else if (size > (size_t) capacity) { - return PN_OVERFLOW; + size = capacity; } char *dst = pn_transport_tail(transport); assert(dst); memmove(dst, src, size); - return pn_transport_process(transport, size); + int n = pn_transport_process(transport, size); + if (n < 0) { + return n; + } else { + return size; + } } int pn_transport_process(pn_transport_t *transport, size_t size) @@ -2062,8 +2059,7 @@ int pn_transport_process(pn_transport_t *transport, size_t size) int pn_transport_close_tail(pn_transport_t *transport) { transport->tail_closed = true; - ssize_t x = transport_consume( transport ); - if (x < 0) return (int) x; + transport_consume( transport ); return 0; // XXX: what if not all input processed at this point? do we care??? } @@ -2084,7 +2080,7 @@ const char *pn_transport_head(pn_transport_t *transport) return NULL; } -int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size) +ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size) { assert(transport); @@ -2092,7 +2088,7 @@ int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size) if (pending < 0) { return pending; } else if (size > (size_t) pending) { - return PN_UNDERFLOW; + size = pending; } if (pending > 0) { @@ -2101,7 +2097,7 @@ int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size) memmove(dst, src, size); } - return 0; + return size; } void pn_transport_pop(pn_transport_t *transport, size_t size) @@ -2120,11 +2116,7 @@ void pn_transport_pop(pn_transport_t *transport, size_t size) int pn_transport_close_head(pn_transport_t *transport) { transport->head_closed = true; - if (transport->close_sent && transport->output_pending == 0) { - return 0; - } else { - return pn_error_set(transport->error, PN_ERR, "connection aborted"); - } + return 0; } // true if the transport will not generate further output http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/types.c ---------------------------------------------------------------------- diff --git a/proton-c/src/types.c b/proton-c/src/types.c index 1179db2..51abc59 100644 --- a/proton-c/src/types.c +++ b/proton-c/src/types.c @@ -23,20 +23,8 @@ #include <stdlib.h> #include <string.h> -pn_bytes_t pn_bytes(size_t size, char *start) +pn_bytes_t pn_bytes(size_t size, const char *start) { pn_bytes_t bytes = {size, start}; return bytes; } - -pn_bytes_t pn_bytes_dup(size_t size, const char *start) -{ - if (size && start) - { - char *dup = (char *) malloc(size); - memmove(dup, start, size); - return pn_bytes(size, dup); - } else { - return pn_bytes(0, NULL); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/util.c ---------------------------------------------------------------------- diff --git a/proton-c/src/util.c b/proton-c/src/util.c index e6b9af0..16e3685 100644 --- a/proton-c/src/util.c +++ b/proton-c/src/util.c @@ -139,20 +139,28 @@ void pni_urldecode(const char *src, char *dst) // Parse URL syntax: // [ <scheme> :// ] [ <user> [ : <password> ] @ ] <host> [ : <port> ] [ / <path> ] -// <user>, <password>, <host>, <port> cannot contain any of '@', ':', '/' +// <scheme>, <user>, <password>, <port> cannot contain any of '@', ':', '/' +// If the first character of <host> is '[' then it can contain any character up to ']' (this is to allow IPv6 +// literal syntax). Otherwise it also cannot contain '@', ':', '/' +// <host> is not optional but it can be null! If it is not present an empty string will be returned // <path> can contain any character void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path) { if (!url) return; - char *scheme_end = strstr(url, "://"); - if (scheme_end) { - *scheme_end = '\0'; - *scheme = url; - url = scheme_end + 3; + char *slash = strchr(url, '/'); + + if (slash && slash>url) { + char *scheme_end = strstr(slash-1, "://"); + + if (scheme_end && scheme_end<slash) { + *scheme_end = '\0'; + *scheme = url; + url = scheme_end + 3; + slash = strchr(url, '/'); + } } - char *slash = strchr(url, '/'); if (slash) { *slash = '\0'; *path = slash + 1; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
