This looks like a patch for a commit I made on trunk a while back. Did
something go awry with creating the patch set?

--Rafael

On Wed, 2012-07-25 at 14:05 -0400, Darryl L. Pierce wrote:
> From: rhs <rhs@13f79535-47bb-0310-9956-ffa450edef68>
> 
> git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1356935 
> 13f79535-47bb-0310-9956-ffa450edef68
> ---
>  proton-c/include/proton/engine.h     |   1 +
>  proton-c/include/proton/error.h      |   1 +
>  proton-c/include/proton/messenger.h  |  18 +++++
>  proton-c/src/dispatcher/dispatcher.c |   4 +-
>  proton-c/src/driver.c                |  25 ++++---
>  proton-c/src/messenger.c             | 129 
> +++++++++++++++++++++++------------
>  tests/proton_tests/__init__.py       |   1 +
>  tests/proton_tests/messenger.py      |  88 ++++++++++++++++++++++++
>  8 files changed, 212 insertions(+), 55 deletions(-)
>  create mode 100644 tests/proton_tests/messenger.py
> 
> diff --git a/proton-c/include/proton/engine.h 
> b/proton-c/include/proton/engine.h
> index b8873bb..610ab47 100644
> --- a/proton-c/include/proton/engine.h
> +++ b/proton-c/include/proton/engine.h
> @@ -76,6 +76,7 @@ typedef int pn_trace_t;
>  #define PN_TRACE_OFF (0)
>  #define PN_TRACE_RAW (1)
>  #define PN_TRACE_FRM (2)
> +#define PN_TRACE_DRV (4)
>  
>  #define PN_SESSION_WINDOW (1024)
>  
> diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
> index 86115c1..ee317ab 100644
> --- a/proton-c/include/proton/error.h
> +++ b/proton-c/include/proton/error.h
> @@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t;
>  #define PN_UNDERFLOW (-4)
>  #define PN_STATE_ERR (-5)
>  #define PN_ARG_ERR (-6)
> +#define PN_TIMEOUT (-7)
>  
>  const char *pn_code(int code);
>  
> diff --git a/proton-c/include/proton/messenger.h 
> b/proton-c/include/proton/messenger.h
> index 42d8cc4..c53bc4a 100644
> --- a/proton-c/include/proton/messenger.h
> +++ b/proton-c/include/proton/messenger.h
> @@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char *name);
>   */
>  const char *pn_messenger_name(pn_messenger_t *messenger);
>  
> +/** Sets the timeout for a Messenger. A negative timeout means
> + * infinite.
> + *
> + * @param[in] messenger the messenger
> + * @param[timeout] the new timeout for the messenger, in milliseconds
> + *
> + * @return an error code or zero if there is no error
> + */
> +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout);
> +
> +/** Retrieves the timeout for a Messenger.
> + *
> + * @param[in] messenger the messenger
> + *
> + * @return the timeout for the messenger, in milliseconds
> + */
> +int pn_messenger_get_timeout(pn_messenger_t *messenger);
> +
>  /** Frees a Messenger.
>   *
>   * @param[in] messenger the messenger to free, no longer valid on
> diff --git a/proton-c/src/dispatcher/dispatcher.c 
> b/proton-c/src/dispatcher/dispatcher.c
> index a9733ae..671a791 100644
> --- a/proton-c/src/dispatcher/dispatcher.c
> +++ b/proton-c/src/dispatcher/dispatcher.c
> @@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, 
> pn_dir_t dir,
>      uint8_t code = scanned ? code64 : 0;
>      size_t n = SCRATCH;
>      pn_data_format(args, disp->scratch, &n);
> -    fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
> -            disp->names[code], disp->scratch);
> +    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
> +            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
>      if (size) {
>        size_t capacity = 4*size + 1;
>        char buf[capacity];
> diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
> index af99fe4..dda169b 100644
> --- a/proton-c/src/driver.c
> +++ b/proton-c/src/driver.c
> @@ -68,11 +68,13 @@ struct pn_listener_t {
>  };
>  
>  #define IO_BUF_SIZE (4*1024)
> +#define NAME_MAX (256)
>  
>  struct pn_connector_t {
>    pn_driver_t *driver;
>    pn_connector_t *connector_next;
>    pn_connector_t *connector_prev;
> +  char name[256];
>    int idx;
>    bool pending_tick;
>    bool pending_read;
> @@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *driver, const 
> char *host,
>  
>    pn_listener_t *l = pn_listener_fd(driver, sock, context);
>  
> -  printf("Listening on %s:%s\n", host, port);
> +  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +    printf("Listening on %s:%s\n", host, port);
>    return l;
>  }
>  
> @@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_listener_t *l)
>        return NULL;
>      } else {
>        pn_configure_sock(sock);
> -      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
> -        printf("accepted from %s:%s\n", host, serv);
> +      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +        fprintf(stderr, "Accepted from %s:%s\n", host, serv);
>        pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
> +      snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
>        c->listener = l;
>        return c;
>      }
> @@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t *driver, const 
> char *host,
>    freeaddrinfo(addr);
>  
>    pn_connector_t *c = pn_connector_fd(driver, sock, context);
> -  printf("Connected to %s:%s\n", host, port);
> +  snprintf(c->name, NAME_MAX, "%s:%s", host, port);
> +  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +    fprintf(stderr, "Connected to %s\n", c->name);
>    return c;
>  }
>  
> @@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_driver_t *driver, int 
> fd, void *context)
>    c->pending_tick = false;
>    c->pending_read = false;
>    c->pending_write = false;
> +  c->name[0] = '\0';
>    c->idx = 0;
>    c->fd = fd;
>    c->status = PN_SEL_RD | PN_SEL_WR;
> @@ -460,7 +467,7 @@ static void pn_connector_process_input(pn_connector_t 
> *ctor)
>        if (n == PN_EOS) {
>          pn_connector_consume(ctor, ctor->input_size);
>        } else {
> -        printf("error in process_input: %s\n", pn_code(n));
> +        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
>        }
>        ctor->input_done = true;
>        break;
> @@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t *c) {
>        c->pending_write = false;
>      }
>      if (c->output_size == 0 && c->input_done && c->output_done) {
> -      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
> -        fprintf(stderr, "closed\n");
> +      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
> +        fprintf(stderr, "Closed %s\n", c->name);
> +      }
>        pn_connector_close(c);
>      }
>    }
> @@ -686,7 +694,8 @@ pn_driver_t *pn_driver()
>    d->ctrl[0] = 0;
>    d->ctrl[1] = 0;
>    d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
> -              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
> +              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
> +              (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
>  
>    // XXX
>    if (pipe(d->ctrl)) {
> diff --git a/proton-c/src/messenger.c b/proton-c/src/messenger.c
> index a12025a..56da348 100644
> --- a/proton-c/src/messenger.c
> +++ b/proton-c/src/messenger.c
> @@ -21,6 +21,7 @@
>  
>  #include <proton/messenger.h>
>  #include <proton/driver.h>
> +#include <proton/util.h>
>  #include <stdlib.h>
>  #include <string.h>
>  #include <stdio.h>
> @@ -29,6 +30,7 @@
>  
>  struct pn_messenger_t {
>    char *name;
> +  int timeout;
>    pn_driver_t *driver;
>    pn_connector_t *connectors[1024];
>    size_t size;
> @@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char *name)
>  
>    if (m) {
>      m->name = build_name(name);
> +    m->timeout = -1;
>      m->driver = pn_driver();
>      m->size = 0;
>      m->listeners = 0;
> @@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messenger_t *messenger)
>    return messenger->name;
>  }
>  
> +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
> +{
> +  if (!messenger) return PN_ARG_ERR;
> +  messenger->timeout = timeout;
> +  return 0;
> +}
> +
> +int pn_messenger_get_timeout(pn_messenger_t *messenger)
> +{
> +  return messenger ? messenger->timeout : 0;
> +}
> +
>  void pn_messenger_free(pn_messenger_t *messenger)
>  {
>    if (messenger) {
> @@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t *messenger, 
> pn_connection_t *conn)
>    }
>  }
>  
> -int pn_messenger_sync(pn_messenger_t *messenger, bool 
> (*predicate)(pn_messenger_t *))
> +long int millis(struct timeval tv)
> +{
> +  return tv.tv_sec * 1000 + tv.tv_usec/1000;
> +}
> +
> +int pn_messenger_tsync(pn_messenger_t *messenger, bool 
> (*predicate)(pn_messenger_t *), int timeout)
>  {
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_process(messenger->connectors[i]);
>    }
>  
> -  while (!predicate(messenger)) {
> -    pn_driver_wait(messenger->driver, -1);
> +  struct timeval now;
> +  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
> +  long int deadline = millis(now) + timeout;
> +  bool pred;
> +
> +  while (true) {
> +    pred = predicate(messenger);
> +    int remaining = deadline - millis(now);
> +    if (pred || (timeout >= 0 && remaining < 0)) break;
> +
> +    pn_driver_wait(messenger->driver, remaining);
>  
>      pn_listener_t *l;
>      while ((l = pn_driver_listener(messenger->driver))) {
> @@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool 
> (*predicate)(pn_messenger_
>          pn_connector_process(c);
>        }
>      }
> +
> +    if (timeout >= 0) {
> +      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
> +    }
>    }
>  
> -  return 0;
> +  return pred ? 0 : PN_TIMEOUT;
>  }
>  
> -bool pn_messenger_linked(pn_messenger_t *messenger)
> +int pn_messenger_sync(pn_messenger_t *messenger, bool 
> (*predicate)(pn_messenger_t *))
>  {
> -  for (int i = 0; i < messenger->size; i++) {
> -    pn_connector_t *ctor = messenger->connectors[i];
> -    pn_connection_t *conn = pn_connector_connection(ctor);
> -    pn_state_t state = pn_connection_state(conn);
> -    if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) ||
> -        (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) {
> -      return false;
> -    }
> -
> -    if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) ||
> -        pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) {
> -      return false;
> -    }
> -  }
> -
> -  return true;
> +  return pn_messenger_tsync(messenger, predicate, messenger->timeout);
>  }
>  
>  int pn_messenger_start(pn_messenger_t *messenger)
>  {
>    if (!messenger) return PN_ARG_ERR;
> -  return pn_messenger_sync(messenger, pn_messenger_linked);
> +  // right now this is a noop
> +  return 0;
>  }
>  
> -bool pn_messenger_unlinked(pn_messenger_t *messenger)
> +bool pn_messenger_stopped(pn_messenger_t *messenger)
>  {
> -  for (int i = 0; i < messenger->size; i++) {
> -    pn_connector_t *ctor = messenger->connectors[i];
> -    pn_connection_t *conn = pn_connector_connection(ctor);
> -    pn_state_t state = pn_connection_state(conn);
> -    if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
> -      return false;
> -  }
> -  return true;
> +  return messenger->size == 0;
>  }
>  
>  int pn_messenger_stop(pn_messenger_t *messenger)
> @@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *messenger)
>      pn_connection_close(conn);
>    }
>  
> -  return pn_messenger_sync(messenger, pn_messenger_unlinked);
> +  return pn_messenger_sync(messenger, pn_messenger_stopped);
>  }
>  
>  static void parse_address(char *address, char **domain, char **name)
> @@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char *b)
>  
>  pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char 
> *domain)
>  {
> +  char buf[strlen(domain) + 1];
> +  if (domain) {
> +    strcpy(buf, domain);
> +  } else {
> +    buf[0] = '\0';
> +  }
> +  char *user = NULL;
> +  char *pass = NULL;
> +  char *host = "0.0.0.0";
> +  char *port = "5672";
> +  parse_url(buf, &user, &pass, &host, &port);
> +
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connection_t *connection = 
> pn_connector_connection(messenger->connectors[i]);
>      const char *container = pn_connection_remote_container(connection);
> @@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_messenger_t 
> *messenger, const char *doma
>        return connection;
>    }
>  
> -  pn_connector_t *connector = pn_connector(messenger->driver, domain, 
> "5672", NULL);
> +  pn_connector_t *connector = pn_connector(messenger->driver, host, port, 
> NULL);
>    if (!connector) return NULL;
>    messenger->connectors[messenger->size++] = connector;
>    pn_sasl_t *sasl = pn_connector_sasl(connector);
> -  pn_sasl_mechanisms(sasl, "ANONYMOUS");
> -  pn_sasl_client(sasl);
> +  if (user) {
> +    pn_sasl_plain(sasl, user, pass);
> +  } else {
> +    pn_sasl_mechanisms(sasl, "ANONYMOUS");
> +    pn_sasl_client(sasl);
> +  }
>    pn_connection_t *connection = pn_connection();
>    pn_connection_set_container(connection, messenger->name);
>    pn_connection_set_hostname(connection, domain);
> @@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_messenger_t 
> *messenger, const char *sourc
>  {
>    char buf[strlen(source) + 1];
>    strcpy(buf, source);
> -  char *domain;
> -  char *name;
> +  char *domain, *name;
>    parse_address(buf, &domain, &name);
> +  char *user = NULL;
> +  char *pass = NULL;
> +  char *host = "0.0.0.0";
> +  char *port = "5672";
> +  parse_url(domain + 1, &user, &pass, &host, &port);
>  
> -  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, 
> "5672", NULL);
> +  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
>    if (listener) {
>      messenger->listeners++;
>    }
> @@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t *mng, 
> pn_message_t *msg)
>    }
>  }
>  
> +bool false_pred(pn_messenger_t *messenger) { return false; }
> +
>  int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
>  {
>    if (!messenger) return PN_ARG_ERR;
> @@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *messenger, 
> pn_message_t *msg)
>          return n;
>        } else {
>          pn_advance(sender);
> +        pn_messenger_tsync(messenger, false_pred, 0);
>          return 0;
>        }
>      }
> @@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *messenger, 
> pn_message_t *msg)
>  
>  bool pn_messenger_sent(pn_messenger_t *messenger)
>  {
> -  //  if (!pn_messenger_linked(messenger)) return false;
> -
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *messenger)
>  
>  bool pn_messenger_rcvd(pn_messenger_t *messenger)
>  {
> -  //  if (!pn_messenger_linked(messenger)) return false;
> -
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n)
>  
>  int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
>  {
> +  if (!messenger) return PN_ARG_ERR;
> +
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *messenger, 
> pn_message_t *msg)
>          ssize_t n = pn_recv(l, buf, 1024);
>          pn_settle(d);
>          if (n < 0) return n;
> -        int err = pn_message_decode(msg, buf, n);
> -        if (err) {
> -          return pn_error_format(messenger->error, err, "error decoding 
> message: %s",
> +        if (msg) {
> +          int err = pn_message_decode(msg, buf, n);
> +          if (err) {
> +            return pn_error_format(messenger->error, err, "error decoding 
> message: %s",
>                                   pn_message_error(msg));
> +          } else {
> +            return 0;
> +          }
>          } else {
>            return 0;
>          }
> @@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *messenger, 
> pn_message_t *msg)
>  
>  int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
>  {
> +  if (!messenger) return 0;
> +
>    int result = 0;
>  
>    for (int i = 0; i < messenger->size; i++) {
> diff --git a/tests/proton_tests/__init__.py b/tests/proton_tests/__init__.py
> index a8a4d52..b467cf5 100644
> --- a/tests/proton_tests/__init__.py
> +++ b/tests/proton_tests/__init__.py
> @@ -19,3 +19,4 @@
>  
>  import proton_tests.engine
>  import proton_tests.message
> +import proton_tests.messenger
> diff --git a/tests/proton_tests/messenger.py b/tests/proton_tests/messenger.py
> new file mode 100644
> index 0000000..17161cd
> --- /dev/null
> +++ b/tests/proton_tests/messenger.py
> @@ -0,0 +1,88 @@
> +#
> +# Licensed to the Apache Software Foundation (ASF) under one
> +# or more contributor license agreements.  See the NOTICE file
> +# distributed with this work for additional information
> +# regarding copyright ownership.  The ASF licenses this file
> +# to you under the Apache License, Version 2.0 (the
> +# "License"); you may not use this file except in compliance
> +# with the License.  You may obtain a copy of the License at
> +# 
> +#   http://www.apache.org/licenses/LICENSE-2.0
> +# 
> +# Unless required by applicable law or agreed to in writing,
> +# software distributed under the License is distributed on an
> +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> +# KIND, either express or implied.  See the License for the
> +# specific language governing permissions and limitations
> +# under the License.
> +#
> +
> +import os, common, xproton
> +from xproton import *
> +from threading import Thread
> +
> +class Test(common.Test):
> +
> +  def setup(self):
> +    self.server = pn_messenger("server")
> +    pn_messenger_set_timeout(self.server, 10000)
> +    pn_messenger_start(self.server)
> +    pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
> +    self.thread = Thread(target=self.run)
> +    self.running = True
> +    self.thread.start()
> +
> +    self.client = pn_messenger("client")
> +    pn_messenger_set_timeout(self.client, 10000)
> +    pn_messenger_start(self.client)
> +
> +  def teardown(self):
> +    self.running = False
> +    msg = pn_message()
> +    pn_message_set_address(msg, "//0.0.0.0:12345")
> +    pn_messenger_put(self.client, msg)
> +    pn_messenger_send(self.client)
> +    pn_messenger_stop(self.client)
> +    self.thread.join()
> +    pn_messenger_free(self.client)
> +    pn_messenger_free(self.server)
> +    self.client = None
> +    self.server = None
> +
> +class MessengerTest(Test):
> +
> +  def run(self):
> +    msg = pn_message()
> +    while self.running:
> +      pn_messenger_recv(self.server, 10)
> +      while pn_messenger_incoming(self.server):
> +        if pn_messenger_get(self.server, msg):
> +          print pn_messenger_error(self.server)
> +        else:
> +          reply_to = pn_message_get_reply_to(msg)
> +          if reply_to:
> +            pn_message_set_address(msg, reply_to)
> +            pn_messenger_put(self.server, msg)
> +    pn_messenger_stop(self.server)
> +
> +  def testSendReceive(self):
> +    msg = pn_message()
> +    pn_message_set_address(msg, "//0.0.0.0:12345")
> +    pn_message_set_subject(msg, "Hello World!")
> +    body = "First the world, then the galaxy!"
> +    pn_message_load(msg, body)
> +    pn_messenger_put(self.client, msg)
> +    pn_messenger_send(self.client)
> +
> +    reply = pn_message()
> +    assert not pn_messenger_recv(self.client, 1)
> +    assert pn_messenger_incoming(self.client) == 1
> +    assert not pn_messenger_get(self.client, reply)
> +
> +    assert pn_message_get_subject(reply) == "Hello World!"
> +    cd, rbod = pn_message_save(reply, 1024)
> +    assert not cd
> +    assert rbod == body
> +
> +    pn_message_free(msg)
> +    pn_message_free(reply)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to