This looks like a patch for a commit I made on trunk a while back. Did something go awry with creating the patch set?
--Rafael On Wed, 2012-07-25 at 14:05 -0400, Darryl L. Pierce wrote: > From: rhs <rhs@13f79535-47bb-0310-9956-ffa450edef68> > > git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1356935 > 13f79535-47bb-0310-9956-ffa450edef68 > --- > proton-c/include/proton/engine.h | 1 + > proton-c/include/proton/error.h | 1 + > proton-c/include/proton/messenger.h | 18 +++++ > proton-c/src/dispatcher/dispatcher.c | 4 +- > proton-c/src/driver.c | 25 ++++--- > proton-c/src/messenger.c | 129 > +++++++++++++++++++++++------------ > tests/proton_tests/__init__.py | 1 + > tests/proton_tests/messenger.py | 88 ++++++++++++++++++++++++ > 8 files changed, 212 insertions(+), 55 deletions(-) > create mode 100644 tests/proton_tests/messenger.py > > diff --git a/proton-c/include/proton/engine.h > b/proton-c/include/proton/engine.h > index b8873bb..610ab47 100644 > --- a/proton-c/include/proton/engine.h > +++ b/proton-c/include/proton/engine.h > @@ -76,6 +76,7 @@ typedef int pn_trace_t; > #define PN_TRACE_OFF (0) > #define PN_TRACE_RAW (1) > #define PN_TRACE_FRM (2) > +#define PN_TRACE_DRV (4) > > #define PN_SESSION_WINDOW (1024) > > diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h > index 86115c1..ee317ab 100644 > --- a/proton-c/include/proton/error.h > +++ b/proton-c/include/proton/error.h > @@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t; > #define PN_UNDERFLOW (-4) > #define PN_STATE_ERR (-5) > #define PN_ARG_ERR (-6) > +#define PN_TIMEOUT (-7) > > const char *pn_code(int code); > > diff --git a/proton-c/include/proton/messenger.h > b/proton-c/include/proton/messenger.h > index 42d8cc4..c53bc4a 100644 > --- a/proton-c/include/proton/messenger.h > +++ b/proton-c/include/proton/messenger.h > @@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char *name); > */ > const char *pn_messenger_name(pn_messenger_t *messenger); > > +/** Sets the timeout for a Messenger. A negative timeout means > + * infinite. > + * > + * @param[in] messenger the messenger > + * @param[timeout] the new timeout for the messenger, in milliseconds > + * > + * @return an error code or zero if there is no error > + */ > +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout); > + > +/** Retrieves the timeout for a Messenger. > + * > + * @param[in] messenger the messenger > + * > + * @return the timeout for the messenger, in milliseconds > + */ > +int pn_messenger_get_timeout(pn_messenger_t *messenger); > + > /** Frees a Messenger. > * > * @param[in] messenger the messenger to free, no longer valid on > diff --git a/proton-c/src/dispatcher/dispatcher.c > b/proton-c/src/dispatcher/dispatcher.c > index a9733ae..671a791 100644 > --- a/proton-c/src/dispatcher/dispatcher.c > +++ b/proton-c/src/dispatcher/dispatcher.c > @@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, > pn_dir_t dir, > uint8_t code = scanned ? code64 : 0; > size_t n = SCRATCH; > pn_data_format(args, disp->scratch, &n); > - fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-", > - disp->names[code], disp->scratch); > + fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch, > + dir == OUT ? "->" : "<-", disp->names[code], disp->scratch); > if (size) { > size_t capacity = 4*size + 1; > char buf[capacity]; > diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c > index af99fe4..dda169b 100644 > --- a/proton-c/src/driver.c > +++ b/proton-c/src/driver.c > @@ -68,11 +68,13 @@ struct pn_listener_t { > }; > > #define IO_BUF_SIZE (4*1024) > +#define NAME_MAX (256) > > struct pn_connector_t { > pn_driver_t *driver; > pn_connector_t *connector_next; > pn_connector_t *connector_prev; > + char name[256]; > int idx; > bool pending_tick; > bool pending_read; > @@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *driver, const > char *host, > > pn_listener_t *l = pn_listener_fd(driver, sock, context); > > - printf("Listening on %s:%s\n", host, port); > + if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) > + printf("Listening on %s:%s\n", host, port); > return l; > } > > @@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_listener_t *l) > return NULL; > } else { > pn_configure_sock(sock); > - if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW)) > - printf("accepted from %s:%s\n", host, serv); > + if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) > + fprintf(stderr, "Accepted from %s:%s\n", host, serv); > pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL); > + snprintf(c->name, NAME_MAX, "%s:%s", host, serv); > c->listener = l; > return c; > } > @@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t *driver, const > char *host, > freeaddrinfo(addr); > > pn_connector_t *c = pn_connector_fd(driver, sock, context); > - printf("Connected to %s:%s\n", host, port); > + snprintf(c->name, NAME_MAX, "%s:%s", host, port); > + if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) > + fprintf(stderr, "Connected to %s\n", c->name); > return c; > } > > @@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_driver_t *driver, int > fd, void *context) > c->pending_tick = false; > c->pending_read = false; > c->pending_write = false; > + c->name[0] = '\0'; > c->idx = 0; > c->fd = fd; > c->status = PN_SEL_RD | PN_SEL_WR; > @@ -460,7 +467,7 @@ static void pn_connector_process_input(pn_connector_t > *ctor) > if (n == PN_EOS) { > pn_connector_consume(ctor, ctor->input_size); > } else { > - printf("error in process_input: %s\n", pn_code(n)); > + fprintf(stderr, "error in process_input: %s\n", pn_code(n)); > } > ctor->input_done = true; > break; > @@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t *c) { > c->pending_write = false; > } > if (c->output_size == 0 && c->input_done && c->output_done) { > - if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW)) > - fprintf(stderr, "closed\n"); > + if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) { > + fprintf(stderr, "Closed %s\n", c->name); > + } > pn_connector_close(c); > } > } > @@ -686,7 +694,8 @@ pn_driver_t *pn_driver() > d->ctrl[0] = 0; > d->ctrl[1] = 0; > d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | > - (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF)); > + (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | > + (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF)); > > // XXX > if (pipe(d->ctrl)) { > diff --git a/proton-c/src/messenger.c b/proton-c/src/messenger.c > index a12025a..56da348 100644 > --- a/proton-c/src/messenger.c > +++ b/proton-c/src/messenger.c > @@ -21,6 +21,7 @@ > > #include <proton/messenger.h> > #include <proton/driver.h> > +#include <proton/util.h> > #include <stdlib.h> > #include <string.h> > #include <stdio.h> > @@ -29,6 +30,7 @@ > > struct pn_messenger_t { > char *name; > + int timeout; > pn_driver_t *driver; > pn_connector_t *connectors[1024]; > size_t size; > @@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char *name) > > if (m) { > m->name = build_name(name); > + m->timeout = -1; > m->driver = pn_driver(); > m->size = 0; > m->listeners = 0; > @@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messenger_t *messenger) > return messenger->name; > } > > +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout) > +{ > + if (!messenger) return PN_ARG_ERR; > + messenger->timeout = timeout; > + return 0; > +} > + > +int pn_messenger_get_timeout(pn_messenger_t *messenger) > +{ > + return messenger ? messenger->timeout : 0; > +} > + > void pn_messenger_free(pn_messenger_t *messenger) > { > if (messenger) { > @@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t *messenger, > pn_connection_t *conn) > } > } > > -int pn_messenger_sync(pn_messenger_t *messenger, bool > (*predicate)(pn_messenger_t *)) > +long int millis(struct timeval tv) > +{ > + return tv.tv_sec * 1000 + tv.tv_usec/1000; > +} > + > +int pn_messenger_tsync(pn_messenger_t *messenger, bool > (*predicate)(pn_messenger_t *), int timeout) > { > for (int i = 0; i < messenger->size; i++) { > pn_connector_process(messenger->connectors[i]); > } > > - while (!predicate(messenger)) { > - pn_driver_wait(messenger->driver, -1); > + struct timeval now; > + if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n"); > + long int deadline = millis(now) + timeout; > + bool pred; > + > + while (true) { > + pred = predicate(messenger); > + int remaining = deadline - millis(now); > + if (pred || (timeout >= 0 && remaining < 0)) break; > + > + pn_driver_wait(messenger->driver, remaining); > > pn_listener_t *l; > while ((l = pn_driver_listener(messenger->driver))) { > @@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool > (*predicate)(pn_messenger_ > pn_connector_process(c); > } > } > + > + if (timeout >= 0) { > + if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n"); > + } > } > > - return 0; > + return pred ? 0 : PN_TIMEOUT; > } > > -bool pn_messenger_linked(pn_messenger_t *messenger) > +int pn_messenger_sync(pn_messenger_t *messenger, bool > (*predicate)(pn_messenger_t *)) > { > - for (int i = 0; i < messenger->size; i++) { > - pn_connector_t *ctor = messenger->connectors[i]; > - pn_connection_t *conn = pn_connector_connection(ctor); > - pn_state_t state = pn_connection_state(conn); > - if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) || > - (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) { > - return false; > - } > - > - if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) || > - pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) { > - return false; > - } > - } > - > - return true; > + return pn_messenger_tsync(messenger, predicate, messenger->timeout); > } > > int pn_messenger_start(pn_messenger_t *messenger) > { > if (!messenger) return PN_ARG_ERR; > - return pn_messenger_sync(messenger, pn_messenger_linked); > + // right now this is a noop > + return 0; > } > > -bool pn_messenger_unlinked(pn_messenger_t *messenger) > +bool pn_messenger_stopped(pn_messenger_t *messenger) > { > - for (int i = 0; i < messenger->size; i++) { > - pn_connector_t *ctor = messenger->connectors[i]; > - pn_connection_t *conn = pn_connector_connection(ctor); > - pn_state_t state = pn_connection_state(conn); > - if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) > - return false; > - } > - return true; > + return messenger->size == 0; > } > > int pn_messenger_stop(pn_messenger_t *messenger) > @@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *messenger) > pn_connection_close(conn); > } > > - return pn_messenger_sync(messenger, pn_messenger_unlinked); > + return pn_messenger_sync(messenger, pn_messenger_stopped); > } > > static void parse_address(char *address, char **domain, char **name) > @@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char *b) > > pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char > *domain) > { > + char buf[strlen(domain) + 1]; > + if (domain) { > + strcpy(buf, domain); > + } else { > + buf[0] = '\0'; > + } > + char *user = NULL; > + char *pass = NULL; > + char *host = "0.0.0.0"; > + char *port = "5672"; > + parse_url(buf, &user, &pass, &host, &port); > + > for (int i = 0; i < messenger->size; i++) { > pn_connection_t *connection = > pn_connector_connection(messenger->connectors[i]); > const char *container = pn_connection_remote_container(connection); > @@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_messenger_t > *messenger, const char *doma > return connection; > } > > - pn_connector_t *connector = pn_connector(messenger->driver, domain, > "5672", NULL); > + pn_connector_t *connector = pn_connector(messenger->driver, host, port, > NULL); > if (!connector) return NULL; > messenger->connectors[messenger->size++] = connector; > pn_sasl_t *sasl = pn_connector_sasl(connector); > - pn_sasl_mechanisms(sasl, "ANONYMOUS"); > - pn_sasl_client(sasl); > + if (user) { > + pn_sasl_plain(sasl, user, pass); > + } else { > + pn_sasl_mechanisms(sasl, "ANONYMOUS"); > + pn_sasl_client(sasl); > + } > pn_connection_t *connection = pn_connection(); > pn_connection_set_container(connection, messenger->name); > pn_connection_set_hostname(connection, domain); > @@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_messenger_t > *messenger, const char *sourc > { > char buf[strlen(source) + 1]; > strcpy(buf, source); > - char *domain; > - char *name; > + char *domain, *name; > parse_address(buf, &domain, &name); > + char *user = NULL; > + char *pass = NULL; > + char *host = "0.0.0.0"; > + char *port = "5672"; > + parse_url(domain + 1, &user, &pass, &host, &port); > > - pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, > "5672", NULL); > + pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL); > if (listener) { > messenger->listeners++; > } > @@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t *mng, > pn_message_t *msg) > } > } > > +bool false_pred(pn_messenger_t *messenger) { return false; } > + > int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg) > { > if (!messenger) return PN_ARG_ERR; > @@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *messenger, > pn_message_t *msg) > return n; > } else { > pn_advance(sender); > + pn_messenger_tsync(messenger, false_pred, 0); > return 0; > } > } > @@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *messenger, > pn_message_t *msg) > > bool pn_messenger_sent(pn_messenger_t *messenger) > { > - // if (!pn_messenger_linked(messenger)) return false; > - > for (int i = 0; i < messenger->size; i++) { > pn_connector_t *ctor = messenger->connectors[i]; > pn_connection_t *conn = pn_connector_connection(ctor); > @@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *messenger) > > bool pn_messenger_rcvd(pn_messenger_t *messenger) > { > - // if (!pn_messenger_linked(messenger)) return false; > - > for (int i = 0; i < messenger->size; i++) { > pn_connector_t *ctor = messenger->connectors[i]; > pn_connection_t *conn = pn_connector_connection(ctor); > @@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n) > > int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg) > { > + if (!messenger) return PN_ARG_ERR; > + > for (int i = 0; i < messenger->size; i++) { > pn_connector_t *ctor = messenger->connectors[i]; > pn_connection_t *conn = pn_connector_connection(ctor); > @@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *messenger, > pn_message_t *msg) > ssize_t n = pn_recv(l, buf, 1024); > pn_settle(d); > if (n < 0) return n; > - int err = pn_message_decode(msg, buf, n); > - if (err) { > - return pn_error_format(messenger->error, err, "error decoding > message: %s", > + if (msg) { > + int err = pn_message_decode(msg, buf, n); > + if (err) { > + return pn_error_format(messenger->error, err, "error decoding > message: %s", > pn_message_error(msg)); > + } else { > + return 0; > + } > } else { > return 0; > } > @@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *messenger, > pn_message_t *msg) > > int pn_messenger_queued(pn_messenger_t *messenger, bool sender) > { > + if (!messenger) return 0; > + > int result = 0; > > for (int i = 0; i < messenger->size; i++) { > diff --git a/tests/proton_tests/__init__.py b/tests/proton_tests/__init__.py > index a8a4d52..b467cf5 100644 > --- a/tests/proton_tests/__init__.py > +++ b/tests/proton_tests/__init__.py > @@ -19,3 +19,4 @@ > > import proton_tests.engine > import proton_tests.message > +import proton_tests.messenger > diff --git a/tests/proton_tests/messenger.py b/tests/proton_tests/messenger.py > new file mode 100644 > index 0000000..17161cd > --- /dev/null > +++ b/tests/proton_tests/messenger.py > @@ -0,0 +1,88 @@ > +# > +# Licensed to the Apache Software Foundation (ASF) under one > +# or more contributor license agreements. See the NOTICE file > +# distributed with this work for additional information > +# regarding copyright ownership. The ASF licenses this file > +# to you under the Apache License, Version 2.0 (the > +# "License"); you may not use this file except in compliance > +# with the License. You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, > +# software distributed under the License is distributed on an > +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > +# KIND, either express or implied. See the License for the > +# specific language governing permissions and limitations > +# under the License. > +# > + > +import os, common, xproton > +from xproton import * > +from threading import Thread > + > +class Test(common.Test): > + > + def setup(self): > + self.server = pn_messenger("server") > + pn_messenger_set_timeout(self.server, 10000) > + pn_messenger_start(self.server) > + pn_messenger_subscribe(self.server, "//~0.0.0.0:12345") > + self.thread = Thread(target=self.run) > + self.running = True > + self.thread.start() > + > + self.client = pn_messenger("client") > + pn_messenger_set_timeout(self.client, 10000) > + pn_messenger_start(self.client) > + > + def teardown(self): > + self.running = False > + msg = pn_message() > + pn_message_set_address(msg, "//0.0.0.0:12345") > + pn_messenger_put(self.client, msg) > + pn_messenger_send(self.client) > + pn_messenger_stop(self.client) > + self.thread.join() > + pn_messenger_free(self.client) > + pn_messenger_free(self.server) > + self.client = None > + self.server = None > + > +class MessengerTest(Test): > + > + def run(self): > + msg = pn_message() > + while self.running: > + pn_messenger_recv(self.server, 10) > + while pn_messenger_incoming(self.server): > + if pn_messenger_get(self.server, msg): > + print pn_messenger_error(self.server) > + else: > + reply_to = pn_message_get_reply_to(msg) > + if reply_to: > + pn_message_set_address(msg, reply_to) > + pn_messenger_put(self.server, msg) > + pn_messenger_stop(self.server) > + > + def testSendReceive(self): > + msg = pn_message() > + pn_message_set_address(msg, "//0.0.0.0:12345") > + pn_message_set_subject(msg, "Hello World!") > + body = "First the world, then the galaxy!" > + pn_message_load(msg, body) > + pn_messenger_put(self.client, msg) > + pn_messenger_send(self.client) > + > + reply = pn_message() > + assert not pn_messenger_recv(self.client, 1) > + assert pn_messenger_incoming(self.client) == 1 > + assert not pn_messenger_get(self.client, reply) > + > + assert pn_message_get_subject(reply) == "Hello World!" > + cd, rbod = pn_message_save(reply, 1024) > + assert not cd > + assert rbod == body > + > + pn_message_free(msg) > + pn_message_free(reply) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
