Author: rhs
Date: Tue Sep 25 03:28:13 2012
New Revision: 1389698

URL: http://svn.apache.org/viewvc?rev=1389698&view=rev
Log:
basic ssl integration for messenger

Modified:
    qpid/proton/trunk/examples/messenger/recv.py
    qpid/proton/trunk/examples/messenger/send.py
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/include/proton/util.h
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-c/src/proton.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/util.c
    qpid/proton/trunk/tests/proton_tests/messenger.py

Modified: qpid/proton/trunk/examples/messenger/recv.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Tue Sep 25 03:28:13 2012
@@ -22,13 +22,19 @@ from proton import *
 
 parser = optparse.OptionParser(usage="usage: %prog [options] <addr_1> ... 
<addr_n>",
                                description="simple message receiver")
+parser.add_option("-c", "--certificate", help="path to certificate file")
+parser.add_option("-k", "--private-key", help="path to private key file")
+parser.add_option("-p", "--password", help="password for private key file")
 
 opts, args = parser.parse_args()
 
 if not args:
-  args = ["//~0.0.0.0"]
+  args = ["amqp://~0.0.0.0"]
 
 mng = Messenger()
+mng.certificate=opts.certificate
+mng.private_key=opts.private_key
+mng.password=opts.password
 mng.start()
 
 for a in args:

Modified: qpid/proton/trunk/examples/messenger/send.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Tue Sep 25 03:28:13 2012
@@ -22,7 +22,7 @@ from proton import *
 
 parser = optparse.OptionParser(usage="usage: %prog [options] <msg_1> ... 
<msg_n>",
                                description="simple message sender")
-parser.add_option("-a", "--address", default="//0.0.0.0",
+parser.add_option("-a", "--address", default="amqp://0.0.0.0",
                   help="address: //<domain>[/<name>] (default %default)")
 
 opts, args = parser.parse_args()

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Sep 25 03:28:13 
2012
@@ -38,6 +38,38 @@ class Messenger(object):
     return pn_messenger_name(self._mng)
 
   @property
+  def certificate(self):
+    return pn_messenger_get_certificate(self._mng)
+
+  @certificate.setter
+  def certificate(self, value):
+    self._check(pn_messenger_set_certificate(self._mng, value))
+
+  @property
+  def private_key(self):
+    return pn_messenger_get_private_key(self._mng)
+
+  @private_key.setter
+  def private_key(self, value):
+    self._check(pn_messenger_set_private_key(self._mng, value))
+
+  @property
+  def password(self):
+    return pn_messenger_get_password(self._mng)
+
+  @password.setter
+  def password(self, value):
+    self._check(pn_messenger_set_password(self._mng, value))
+
+  @property
+  def trusted_certificates(self):
+    return pn_messenger_get_trusted_certificates(self._mng)
+
+  @trusted_certificates.setter
+  def trusted_certificates(self, value):
+    self._check(pn_messenger_set_trusted_certificates(self._mng, value))
+
+  @property
   def timeout(self):
     return pn_messenger_get_timeout(self._mng)
 

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Sep 25 03:28:13 
2012
@@ -52,6 +52,70 @@ pn_messenger_t *pn_messenger(const char 
  */
 const char *pn_messenger_name(pn_messenger_t *messenger);
 
+/** Sets the certificate file for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] certificate a path to a certificate file
+ *
+ * @return an error code of zero if there is no error
+ */
+int pn_messenger_set_certificate(pn_messenger_t *messenger, const char 
*certificate);
+
+/** Gets the certificate file fora Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return the certificate file path
+ */
+const char *pn_messenger_get_certificate(pn_messenger_t *messenger);
+
+/** Sets the private key file for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] private_key a path to a private key file
+ *
+ * @return an error code of zero if there is no error
+ */
+int pn_messenger_set_private_key(pn_messenger_t *messenger, const char 
*private_key);
+
+/** Gets the private key file for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return the private key file path
+ */
+const char *pn_messenger_get_private_key(pn_messenger_t *messenger);
+
+/** Sets the private key password for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] password the password for the private key file
+ *
+ * @return an error code of zero if there is no error
+ */
+int pn_messenger_set_password(pn_messenger_t *messenger, const char *password);
+
+/** Gets the private key file password for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return password for the private key file
+ */
+const char *pn_messenger_get_password(pn_messenger_t *messenger);
+
+/** Sets the trusted certificates database for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] cert_db a path to the certificates database
+ *
+ * @return an error code of zero if there is no error
+ */
+int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const 
char *cert_db);
+
+/** Gets the trusted certificates database for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return path to the trusted certificates database
+ */
+const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger);
+
 /** Sets the timeout for a Messenger. A negative timeout means
  * infinite.
  *

Modified: qpid/proton/trunk/proton-c/include/proton/util.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/util.h?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/util.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/util.h Tue Sep 25 03:28:13 2012
@@ -28,7 +28,7 @@
 extern "C" {
 #endif
 
-void parse_url(char *url, char **user, char **pass, char **host, char **port);
+void parse_url(char *url, char **scheme, char **user, char **pass, char 
**host, char **port, char **path);
 void pn_fatal(char *fmt, ...);
 void pn_vfatal(char *fmt, va_list ap);
 

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Tue Sep 25 03:28:13 2012
@@ -22,6 +22,7 @@
 #include <proton/messenger.h>
 #include <proton/driver.h>
 #include <proton/util.h>
+#include <proton/ssl.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
@@ -30,6 +31,10 @@
 
 struct pn_messenger_t {
   char *name;
+  char *certificate;
+  char *private_key;
+  char *password;
+  char *trusted_certificates;
   int timeout;
   pn_driver_t *driver;
   int credit;
@@ -56,6 +61,10 @@ pn_messenger_t *pn_messenger(const char 
 
   if (m) {
     m->name = build_name(name);
+    m->certificate = NULL;
+    m->private_key = NULL;
+    m->password = NULL;
+    m->trusted_certificates = NULL;
     m->timeout = -1;
     m->driver = pn_driver();
     m->credit = 0;
@@ -71,6 +80,54 @@ const char *pn_messenger_name(pn_messeng
   return messenger->name;
 }
 
+int pn_messenger_set_certificate(pn_messenger_t *messenger, const char 
*certificate)
+{
+  if (messenger->certificate) free(messenger->certificate);
+  messenger->certificate = pn_strdup(certificate);
+  return 0;
+}
+
+const char *pn_messenger_get_certificate(pn_messenger_t *messenger)
+{
+  return messenger->certificate;
+}
+
+int pn_messenger_set_private_key(pn_messenger_t *messenger, const char 
*private_key)
+{
+  if (messenger->private_key) free(messenger->private_key);
+  messenger->private_key = pn_strdup(private_key);
+  return 0;
+}
+
+const char *pn_messenger_get_private_key(pn_messenger_t *messenger)
+{
+  return messenger->private_key;
+}
+
+int pn_messenger_set_password(pn_messenger_t *messenger, const char *password)
+{
+  if (messenger->password) free(messenger->password);
+  messenger->password = pn_strdup(password);
+  return 0;
+}
+
+const char *pn_messenger_get_password(pn_messenger_t *messenger)
+{
+  return messenger->password;
+}
+
+int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const 
char *trusted_certificates)
+{
+  if (messenger->trusted_certificates) free(messenger->trusted_certificates);
+  messenger->trusted_certificates = pn_strdup(trusted_certificates);
+  return 0;
+}
+
+const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger)
+{
+  return messenger->trusted_certificates;
+}
+
 int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
 {
   if (!messenger) return PN_ARG_ERR;
@@ -87,6 +144,10 @@ void pn_messenger_free(pn_messenger_t *m
 {
   if (messenger) {
     free(messenger->name);
+    free(messenger->certificate);
+    free(messenger->private_key);
+    free(messenger->password);
+    free(messenger->trusted_certificates);
     pn_driver_free(messenger->driver);
     pn_error_free(messenger->error);
     free(messenger);
@@ -211,8 +272,21 @@ int pn_messenger_tsync(pn_messenger_t *m
 
     pn_listener_t *l;
     while ((l = pn_driver_listener(messenger->driver))) {
+      char *scheme = pn_listener_context(l);
       pn_connector_t *c = pn_listener_accept(l);
-      pn_sasl_t *sasl = pn_connector_sasl(c);
+      pn_transport_t *t = pn_connector_transport(c);
+      pn_ssl_t *ssl = pn_ssl(t);
+      pn_ssl_init(ssl, PN_SSL_MODE_SERVER);
+      if (messenger->certificate) {
+        pn_ssl_set_credentials(ssl, messenger->certificate,
+                               messenger->private_key,
+                               messenger->password);
+      }
+      pn_ssl_set_peer_authentication(ssl, PN_SSL_NO_VERIFY_PEER, NULL);
+      if (!(scheme && !strcmp(scheme, "amqps"))) {
+        pn_ssl_allow_unsecured_client(ssl);
+      }
+      pn_sasl_t *sasl = pn_sasl(t);
       pn_sasl_mechanisms(sasl, "ANONYMOUS");
       pn_sasl_server(sasl);
       pn_sasl_done(sasl, PN_SASL_OK);
@@ -282,50 +356,46 @@ int pn_messenger_stop(pn_messenger_t *me
     pn_listener_close(l);
     pn_listener_t *prev = l;
     l = pn_listener_next(l);
+    free(pn_listener_context(prev));
     pn_listener_free(prev);
   }
 
   return pn_messenger_sync(messenger, pn_messenger_stopped);
 }
 
-static void parse_address(char *address, char **domain, char **name)
-{
-  *domain = NULL;
-  *name = NULL;
-
-  if (!address) return;
-  if (address[0] == '/' && address[1] == '/') {
-    *domain = address + 2;
-    for (char *c = *domain; *c; c++) {
-      if (*c == '/') {
-        *c = '\0';
-        *name = c + 1;
-        break;
-      }
-    }
-  } else {
-    *name = address;
-  }
-}
-
 bool pn_streq(const char *a, const char *b)
 {
   return a == b || (a && b && !strcmp(a, b));
 }
 
-pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char 
*domain)
+pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char 
*address, char **name)
 {
-  char buf[domain ? strlen(domain) + 1 : 1];
-  if (domain) {
-    strcpy(buf, domain);
+  size_t size = address ? strlen(address) + 1 : 1;
+  char buf[size];
+  if (address) {
+    strcpy(buf, address);
   } else {
     buf[0] = '\0';
   }
+  char *scheme = NULL;
   char *user = NULL;
   char *pass = NULL;
   char *host = "0.0.0.0";
-  char *port = "5672";
-  parse_url(buf, &user, &pass, &host, &port);
+  char *port = NULL;
+  parse_url(buf, &scheme, &user, &pass, &host, &port, name);
+
+  char domain[size];
+  domain[0] = '\0';
+
+  if (user) {
+    strcat(domain, user);
+    strcat(domain, "@");
+  }
+  strcat(domain, host);
+  if (port) {
+    strcat(domain, ":");
+    strcat(domain, port);
+  }
 
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
@@ -337,9 +407,26 @@ pn_connection_t *pn_messenger_domain(pn_
     ctor = pn_connector_next(ctor);
   }
 
-  pn_connector_t *connector = pn_connector(messenger->driver, host, port, 
NULL);
+  pn_connector_t *connector = pn_connector(messenger->driver, host, port ? 
port : "5672", NULL);
   if (!connector) return NULL;
-  pn_sasl_t *sasl = pn_connector_sasl(connector);
+  pn_transport_t *transport = pn_connector_transport(connector);
+  if (scheme && !strcmp(scheme, "amqps")) {
+    pn_ssl_t *ssl = pn_ssl(transport);
+    pn_ssl_init(ssl, PN_SSL_MODE_CLIENT);
+    if (messenger->certificate && messenger->private_key) {
+      pn_ssl_set_credentials(ssl, messenger->certificate,
+                             messenger->private_key,
+                             messenger->password);
+    }
+    if (messenger->trusted_certificates) {
+      pn_ssl_set_trusted_ca_db(ssl, messenger->trusted_certificates);
+      pn_ssl_set_peer_authentication(ssl, PN_SSL_VERIFY_PEER, NULL);
+    } else {
+      pn_ssl_set_peer_authentication(ssl, PN_SSL_NO_VERIFY_PEER, NULL);
+    }
+  }
+
+  pn_sasl_t *sasl = pn_sasl(transport);
   if (user) {
     pn_sasl_plain(sasl, user, pass);
   } else {
@@ -357,17 +444,14 @@ pn_connection_t *pn_messenger_domain(pn_
 
 pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, 
bool sender)
 {
-  char buf[(address ? strlen(address) : 0) + 1];
+  char copy[(address ? strlen(address) : 0) + 1];
   if (address) {
-    strcpy(buf, address);
+    strcpy(copy, address);
   } else {
-    buf[0] = '\0';
+    copy[0] = '\0';
   }
-  char *domain;
-  char *name;
-  parse_address(address ? buf : NULL, &domain, &name);
-
-  pn_connection_t *connection = pn_messenger_domain(messenger, domain);
+  char *name = NULL;
+  pn_connection_t *connection = pn_messenger_resolve(messenger, copy, &name);
   if (!connection) return NULL;
 
   pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE);
@@ -405,26 +489,22 @@ pn_link_t *pn_messenger_target(pn_messen
   return pn_messenger_link(messenger, target, true);
 }
 
-pn_listener_t *pn_messenger_isource(pn_messenger_t *messenger, const char 
*source)
+int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
 {
-  char buf[strlen(source) + 1];
-  strcpy(buf, source);
-  char *domain, *name;
-  parse_address(buf, &domain, &name);
+  char copy[strlen(source) + 1];
+  strcpy(copy, source);
+
+  char *scheme = NULL;
   char *user = NULL;
   char *pass = NULL;
   char *host = "0.0.0.0";
   char *port = "5672";
-  parse_url(domain + 1, &user, &pass, &host, &port);
+  char *path = NULL;
 
-  return pn_listener(messenger->driver, host, port, NULL);
-}
+  parse_url(copy, &scheme, &user, &pass, &host, &port, &path);
 
-int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
-{
-  int len = strlen(source);
-  if (len >= 3 && source[0] == '/' && source[1] == '/' && source[2] == '~') {
-    pn_listener_t *lnr = pn_messenger_isource(messenger, source);
+  if (host[0] == '~') {
+    pn_listener_t *lnr = pn_listener(messenger->driver, host + 1, port, 
pn_strdup(scheme));
     if (lnr) {
       return 0;
     } else {
@@ -432,7 +512,7 @@ int pn_messenger_subscribe(pn_messenger_
                              "unable to subscribe to source: %s (%s)", source,
                              pn_driver_error(messenger->driver));
     }
-  } else if (len >= 2 && source[0] == '/' && source[1] == '/') {
+  } else if (scheme) {
     pn_link_t *src = pn_messenger_source(messenger, source);
     if (src) {
       return 0;
@@ -452,11 +532,11 @@ static void outward_munge(pn_messenger_t
   int len = address ? strlen(address) : 0;
   if (len > 0 && address[0] != '/') {
     char buf[len + strlen(mng->name) + 4];
-    sprintf(buf, "//%s/%s", mng->name, address);
+    sprintf(buf, "amqp://%s/%s", mng->name, address);
     pn_message_set_reply_to(msg, buf);
   } else if (len == 0) {
     char buf[strlen(mng->name) + 4];
-    sprintf(buf, "//%s", mng->name);
+    sprintf(buf, "amqp://%s", mng->name);
     pn_message_set_reply_to(msg, buf);
   }
 }

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Tue Sep 25 03:28:13 2012
@@ -441,12 +441,14 @@ int main(int argc, char **argv)
     }
   }
 
+  char *scheme = NULL;
   char *user = NULL;
   char *pass = NULL;
   char *host = "0.0.0.0";
   char *port = "5672";
+  char *path = NULL;
 
-  parse_url(url, &user, &pass, &host, &port);
+  parse_url(url, &scheme, &user, &pass, &host, &port, &path);
 
   pn_driver_t *drv = pn_driver();
   if (url) {

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Tue Sep 25 03:28:13 2012
@@ -115,6 +115,15 @@ static void _log(pn_ssl_t *ssl, const ch
   }
 }
 
+static void _log_ssl_error(pn_ssl_t *ssl)
+{
+  unsigned long err = ERR_get_error();
+  while (err) {
+    char *msg = ERR_error_string(err, NULL);
+    _log(ssl, "%s\n", msg);
+    err = ERR_get_error();
+  }
+}
 
 // @todo replace with a "reasonable" default (?), allow application to 
register its own
 // callback.
@@ -513,6 +522,7 @@ static ssize_t process_input_ssl( pn_tra
         _log( ssl, "Read %d bytes from socket for app\n", written );
       } else {
         if (!BIO_should_retry(ssl->bio_ssl)) {
+          _log_ssl_error(ssl);
           _log(ssl, "Read from SSL socket failed - SSL connection closed!!\n");
           ssl->ssl_closed = (ssl->app_closed) ? ssl->app_closed : PN_EOS;
           start_ssl_shutdown(ssl);
@@ -627,6 +637,7 @@ static ssize_t process_output_ssl( pn_tr
         _log( ssl, "Wrote %d bytes from app to socket\n", written );
       } else {
         if (!BIO_should_retry(ssl->bio_ssl)) {
+          _log_ssl_error(ssl);
           _log(ssl, "Write to SSL socket failed - SSL connection closed!!\n");
           ssl->ssl_closed = (ssl->app_closed) ? ssl->app_closed : PN_EOS;
           start_ssl_shutdown(ssl);

Modified: qpid/proton/trunk/proton-c/src/util.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.c?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.c (original)
+++ qpid/proton/trunk/proton-c/src/util.c Tue Sep 25 03:28:13 2012
@@ -29,6 +29,7 @@
 #include <ctype.h>
 #include <string.h>
 #include <proton/error.h>
+#include <proton/util.h>
 
 ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size)
 {
@@ -72,27 +73,39 @@ void pn_print_data(const char *bytes, si
   pn_fprint_data(stdout, bytes, size);
 }
 
-void parse_url(char *url, char **user, char **pass, char **host, char **port)
+void parse_url(char *url, char **scheme, char **user, char **pass, char 
**host, char **port, char **path)
 {
   if (url) {
+    char *scheme_end = strstr(url, "://");
+    if (scheme_end) {
+      *scheme_end = '\0';
+      *scheme = url;
+      url = scheme_end + 3;
+    }
+
     char *at = index(url, '@');
-    char *hp;
     if (at) {
       *at = '\0';
-      *user = url;
-      char *colon = index(url, ':');
+      char *up = url;
+      *user = up;
+      url = at + 1;
+      char *colon = index(up, ':');
       if (colon) {
         *colon = '\0';
         *pass = colon + 1;
       }
-      hp = at + 1;
-    } else {
-      hp = url;
     }
 
-    *host = hp;
+    char *slash = index(url, '/');
+    if (slash) {
+      *slash = '\0';
+      *host = url;
+      url = slash + 1;
+    } else {
+      *host = url;
+    }
 
-    char *colon = index(hp, ':');
+    char *colon = index(*host, ':');
     if (colon) {
       *colon = '\0';
       *port = colon + 1;

Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1389698&r1=1389697&r2=1389698&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Tue Sep 25 03:28:13 2012
@@ -27,7 +27,7 @@ class Test(common.Test):
     self.server = pn_messenger("server")
     pn_messenger_set_timeout(self.server, 10000)
     pn_messenger_start(self.server)
-    pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
+    pn_messenger_subscribe(self.server, "amqp://~0.0.0.0:12345")
     self.thread = Thread(target=self.run)
     self.running = True
     self.thread.start()
@@ -39,7 +39,7 @@ class Test(common.Test):
   def teardown(self):
     self.running = False
     msg = pn_message()
-    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_message_set_address(msg, "amqp://0.0.0.0:12345")
     pn_messenger_put(self.client, msg)
     pn_messenger_send(self.client)
     pn_messenger_stop(self.client)
@@ -69,7 +69,7 @@ class MessengerTest(Test):
 
   def testSendReceive(self):
     msg = pn_message()
-    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_message_set_address(msg, "amqp://0.0.0.0:12345")
     pn_message_set_subject(msg, "Hello World!")
     body = "First the world, then the galaxy!"
     pn_message_load(msg, body)
@@ -94,5 +94,5 @@ class MessengerTest(Test):
     pn_message_set_address(msg, "totally-bogus-address")
     assert pn_messenger_put(self.client, msg) == PN_ERR
     err = pn_messenger_error(self.client)
-    assert "unable to send to address: totally-bogus-address (getaddrinfo:" in 
err, err
+    assert "unable to send to address: totally-bogus-address (" in err, err
     pn_message_free(msg)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to