Author: rhs Date: Fri Nov 2 15:53:01 2012 New Revision: 1405017 URL: http://svn.apache.org/viewvc?rev=1405017&view=rev Log: added subscription tracking
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py qpid/proton/trunk/proton-c/include/proton/driver.h qpid/proton/trunk/proton-c/include/proton/messenger.h qpid/proton/trunk/proton-c/src/driver.c qpid/proton/trunk/proton-c/src/messenger.c Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1405017&r1=1405016&r2=1405017&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/bindings/python/proton.py (original) +++ qpid/proton/trunk/proton-c/bindings/python/proton.py Fri Nov 2 15:53:01 2012 @@ -329,7 +329,9 @@ track the status of this many outgoing d @type source: string @param source: the source of messages to subscribe to """ - self._check(pn_messenger_subscribe(self._mng, source)) + sub_impl = pn_messenger_subscribe(self._mng, source) + if not sub_impl: + self._check(PN_ERR) def put(self, message): """ Modified: qpid/proton/trunk/proton-c/include/proton/driver.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1405017&r1=1405016&r2=1405017&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/driver.h (original) +++ qpid/proton/trunk/proton-c/include/proton/driver.h Fri Nov 2 15:53:01 2012 @@ -189,6 +189,8 @@ pn_connector_t *pn_listener_accept(pn_li */ void *pn_listener_context(pn_listener_t *listener); +void pn_listener_set_context(pn_listener_t *listener, void *context); + /** Close the socket used by the listener. * * @param[in] listener the listener whose socket will be closed. Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1405017&r1=1405016&r2=1405017&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/messenger.h (original) +++ qpid/proton/trunk/proton-c/include/proton/messenger.h Fri Nov 2 15:53:01 2012 @@ -34,6 +34,7 @@ extern "C" { */ typedef struct pn_messenger_t pn_messenger_t; /**< Messenger*/ +typedef struct pn_subscription_t pn_subscription_t; /**< Subscription*/ typedef int64_t pn_tracker_t; typedef enum { @@ -209,10 +210,13 @@ int pn_messenger_stop(pn_messenger_t *me * @param[in] messenger the messenger to subscribe * @param[in] source * - * @return an error code or zero on success - * @see error.h + * @return a subscription */ -int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source); +pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source); + +void *pn_subscription_get_context(pn_subscription_t *sub); + +void pn_subscription_set_context(pn_subscription_t *sub, void *context); /** Puts a message on the outgoing message queue for a messenger. * @@ -289,6 +293,8 @@ int pn_messenger_get(pn_messenger_t *mes */ pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger); +pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger); + #define PN_CUMULATIVE (0x1) /** Accepts the incoming messages identified by the tracker. Use the Modified: qpid/proton/trunk/proton-c/src/driver.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1405017&r1=1405016&r2=1405017&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/driver.c (original) +++ qpid/proton/trunk/proton-c/src/driver.c Fri Nov 2 15:53:01 2012 @@ -21,6 +21,7 @@ #define _POSIX_C_SOURCE 1 +#include <assert.h> #include <poll.h> #include <stdio.h> #include <time.h> @@ -210,6 +211,12 @@ void *pn_listener_context(pn_listener_t return l ? l->context : NULL; } +void pn_listener_set_context(pn_listener_t *listener, void *context) +{ + assert(listener); + listener->context = context; +} + static void pn_configure_sock(int sock) { // this would be nice, but doesn't appear to exist on linux /* Modified: qpid/proton/trunk/proton-c/src/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1405017&r1=1405016&r2=1405017&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger.c (original) +++ qpid/proton/trunk/proton-c/src/messenger.c Fri Nov 2 15:53:01 2012 @@ -55,10 +55,19 @@ struct pn_messenger_t { pn_queue_t outgoing; pn_queue_t incoming; pn_accept_mode_t accept_mode; + pn_subscription_t *subscriptions; + size_t sub_capacity; + size_t sub_count; + pn_subscription_t *incoming_subscription; pn_buffer_t *buffer; pn_error_t *error; }; +struct pn_subscription_t { + char *scheme; + void *context; +}; + void pn_queue_init(pn_queue_t *queue) { queue->capacity = 1024; @@ -241,6 +250,10 @@ pn_messenger_t *pn_messenger(const char pn_queue_init(&m->outgoing); pn_queue_init(&m->incoming); m->accept_mode = PN_ACCEPT_MODE_AUTO; + m->subscriptions = NULL; + m->sub_capacity = 0; + m->sub_count = 0; + m->incoming_subscription = NULL; m->buffer = pn_buffer(1024); m->error = pn_error(); } @@ -326,6 +339,10 @@ void pn_messenger_free(pn_messenger_t *m pn_error_free(messenger->error); pn_queue_tini(&messenger->incoming); pn_queue_tini(&messenger->outgoing); + for (int i = 0; i < messenger->sub_count; i++) { + free(messenger->subscriptions[i].scheme); + } + free(messenger->subscriptions); free(messenger); } } @@ -461,7 +478,8 @@ int pn_messenger_tsync(pn_messenger_t *m pn_listener_t *l; while ((l = pn_driver_listener(messenger->driver))) { - char *scheme = pn_listener_context(l); + pn_subscription_t *sub = pn_listener_context(l); + char *scheme = sub->scheme; pn_connector_t *c = pn_listener_accept(l); pn_transport_t *t = pn_connector_transport(c); pn_ssl_t *ssl = pn_ssl(t); @@ -545,7 +563,6 @@ int pn_messenger_stop(pn_messenger_t *me pn_listener_close(l); pn_listener_t *prev = l; l = pn_listener_next(l); - free(pn_listener_context(prev)); pn_listener_free(prev); } @@ -635,6 +652,27 @@ pn_connection_t *pn_messenger_resolve(pn return connection; } +pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme) +{ + PN_ENSURE(messenger->subscriptions, messenger->sub_capacity, messenger->sub_count + 1); + pn_subscription_t *sub = messenger->subscriptions + messenger->sub_count++; + sub->scheme = pn_strdup(scheme); + sub->context = NULL; + return sub; +} + +void *pn_subscription_get_context(pn_subscription_t *sub) +{ + assert(sub); + return sub->context; +} + +void pn_subscription_set_context(pn_subscription_t *sub, void *context) +{ + assert(sub); + sub->context = context; +} + pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender) { char copy[(address ? strlen(address) : 0) + 1]; @@ -663,12 +701,11 @@ pn_link_t *pn_messenger_link(pn_messenge pn_session_open(ssn); link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx"); // XXX - if (sender) { - pn_terminus_set_address(pn_link_target(link), name); - pn_terminus_set_address(pn_link_source(link), name); - } else { - pn_terminus_set_address(pn_link_target(link), name); - pn_terminus_set_address(pn_link_source(link), name); + pn_terminus_set_address(pn_link_target(link), name); + pn_terminus_set_address(pn_link_source(link), name); + if (!sender) { + pn_subscription_t *sub = pn_subscription(messenger, NULL); + pn_link_set_context(link, sub); } pn_link_open(link); return link; @@ -684,7 +721,7 @@ pn_link_t *pn_messenger_target(pn_messen return pn_messenger_link(messenger, target, true); } -int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source) +pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source) { char copy[strlen(source) + 1]; strcpy(copy, source); @@ -700,23 +737,27 @@ int pn_messenger_subscribe(pn_messenger_ if (host[0] == '~') { pn_listener_t *lnr = pn_listener(messenger->driver, host + 1, - port ? port : default_port(scheme), - pn_strdup(scheme)); + port ? port : default_port(scheme), NULL); if (lnr) { - return 0; + pn_subscription_t *sub = pn_subscription(messenger, scheme); + pn_listener_set_context(lnr, sub); + return sub; } else { - return pn_error_format(messenger->error, PN_ERR, - "unable to subscribe to source: %s (%s)", source, - pn_driver_error(messenger->driver)); + pn_error_format(messenger->error, PN_ERR, + "unable to subscribe to source: %s (%s)", source, + pn_driver_error(messenger->driver)); + return NULL; } } else { pn_link_t *src = pn_messenger_source(messenger, source); if (src) { - return 0; + pn_subscription_t *sub = pn_link_get_context(src); + return sub; } else { - return pn_error_format(messenger->error, PN_ERR, - "unable to subscribe to source: %s (%s)", source, - pn_driver_error(messenger->driver)); + pn_error_format(messenger->error, PN_ERR, + "unable to subscribe to source: %s (%s)", source, + pn_driver_error(messenger->driver)); + return NULL; } } } @@ -962,6 +1003,7 @@ int pn_messenger_get(pn_messenger_t *mes while (d) { if (pn_delivery_readable(d) && !pn_delivery_partial(d)) { pn_link_t *l = pn_delivery_link(d); + pn_subscription_t *sub = pn_link_get_context(l); size_t pending = pn_delivery_pending(d); pn_buffer_t *buf = messenger->buffer; int err = pn_buffer_ensure(buf, pending + 1); @@ -978,6 +1020,7 @@ int pn_messenger_get(pn_messenger_t *mes return pn_error_format(messenger->error, n, "PN_EOS expected"); } pn_queue_add(&messenger->incoming, d); + messenger->incoming_subscription = sub; if (msg) { int err = pn_message_decode(msg, encoded, pending); if (err) { @@ -1018,6 +1061,12 @@ pn_tracker_t pn_messenger_incoming_track return pn_tracker(INCOMING, messenger->incoming.hwm - 1); } +pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger) +{ + assert(messenger); + return messenger->incoming_subscription; +} + int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags) { if (pn_tracker_direction(tracker) != INCOMING) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org