Author: rhs
Date: Thu Jun 28 21:33:07 2012
New Revision: 1355166

URL: http://svn.apache.org/viewvc?rev=1355166&view=rev
Log:
added names to messengers; fixed addressing a bit; added server/client example

Added:
    qpid/proton/trunk/examples/messenger/client.py   (with props)
    qpid/proton/trunk/examples/messenger/server.py   (with props)
Modified:
    qpid/proton/trunk/examples/messenger/recv.py
    qpid/proton/trunk/examples/messenger/send.py
    qpid/proton/trunk/proton-c/CMakeLists.txt
    qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt
    qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/message/message.c
    qpid/proton/trunk/proton-c/src/messenger.c

Added: qpid/proton/trunk/examples/messenger/client.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/client.py?rev=1355166&view=auto
==============================================================================
--- qpid/proton/trunk/examples/messenger/client.py (added)
+++ qpid/proton/trunk/examples/messenger/client.py Thu Jun 28 21:33:07 2012
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys, optparse
+from xproton import *
+
+parser = optparse.OptionParser(usage="usage: %prog <addr> <subject>",
+                               description="simple message server")
+
+parser.add_option("-r", "--reply_to", default="replies",
+                  help="address: //<domain>[/<name>] (default %default)")
+
+opts, args = parser.parse_args()
+
+if len(args) != 2:
+  parser.error("incorrect number of arguments")
+
+address, subject = args
+
+mng = pn_messenger(None)
+pn_messenger_start(mng)
+
+msg = pn_message()
+pn_message_set_address(msg, address)
+pn_message_set_subject(msg, subject)
+pn_message_set_reply_to(msg, opts.reply_to)
+
+if pn_messenger_put(mng, msg): print pn_messenger_error(mng)
+if pn_messenger_send(mng): print pn_messenger_error(mng)
+
+if opts.reply_to[:2] != "//":
+  if pn_messenger_recv(mng, 1): print pn_messenger_error(mng)
+  elif pn_messenger_get(mng, msg): print pn_messenger_error(mng)
+  else: print pn_message_get_address(msg), pn_message_get_subject(msg)
+
+pn_messenger_stop(mng)
+pn_messenger_free(mng)

Propchange: qpid/proton/trunk/examples/messenger/client.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/proton/trunk/examples/messenger/recv.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Thu Jun 28 21:33:07 2012
@@ -28,7 +28,7 @@ opts, args = parser.parse_args()
 if not args:
   args = ["//~0.0.0.0"]
 
-mng = pn_messenger()
+mng = pn_messenger(None)
 pn_messenger_start(mng)
 
 for a in args:
@@ -46,9 +46,12 @@ while True:
       print pn_messenger_error(mng)
     else:
       cd, body = pn_message_save(msg, 1024)
-      print pn_message_get_address(msg), \
-          pn_message_get_subject(msg) or "(no subject)", \
-          body
+      if cd:
+        print pn_message_error(msg)
+      else:
+        print pn_message_get_address(msg), \
+            pn_message_get_subject(msg) or "(no subject)", \
+            body
 
 pn_messenger_stop(mng)
 pn_messenger_free(mng)

Modified: qpid/proton/trunk/examples/messenger/send.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Thu Jun 28 21:33:07 2012
@@ -29,7 +29,7 @@ opts, args = parser.parse_args()
 if not args:
   args = ["Hello World!"]
 
-mng = pn_messenger()
+mng = pn_messenger(None)
 pn_messenger_start(mng)
 
 msg = pn_message()

Added: qpid/proton/trunk/examples/messenger/server.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/server.py?rev=1355166&view=auto
==============================================================================
--- qpid/proton/trunk/examples/messenger/server.py (added)
+++ qpid/proton/trunk/examples/messenger/server.py Thu Jun 28 21:33:07 2012
@@ -0,0 +1,70 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys, optparse
+from xproton import *
+
+parser = optparse.OptionParser(usage="usage: %prog <addr_1> ... <addr_n>",
+                               description="simple message server")
+
+opts, args = parser.parse_args()
+
+if not args:
+  args = ["//~0.0.0.0"]
+
+mng = pn_messenger(None)
+print pn_messenger_name(mng)
+pn_messenger_start(mng)
+
+for a in args:
+  if pn_messenger_subscribe(mng, a):
+    print pn_messenger_error(mng)
+    break
+
+def dispatch(request, response):
+  subject = pn_message_get_subject(request)
+  pn_message_set_subject(response, "Re: %s" % subject)
+  print "Dispatched %s" % subject
+
+msg = pn_message()
+reply = pn_message()
+
+while True:
+  if pn_messenger_incoming(mng) < 10:
+    if pn_messenger_recv(mng, 10):
+      print pn_messenger_error(mng)
+      break
+  if pn_messenger_incoming(mng) > 0:
+    if pn_messenger_get(mng, msg):
+      print pn_messenger_error(mng)
+    else:
+      reply_to = pn_message_get_reply_to(msg)
+      cid = pn_message_get_correlation_id(msg)
+      if reply_to:
+        pn_message_set_address(reply, reply_to)
+      if cid:
+        pn_message_set_correlation_id(reply, cid)
+      dispatch(msg, reply)
+      if pn_messenger_put(mng, reply):
+        print pn_messenger_error(mng)
+      if pn_messenger_send(mng):
+        print pn_messenger_error(mng)
+
+pn_messenger_stop(mng)
+pn_messenger_free(mng)

Propchange: qpid/proton/trunk/examples/messenger/server.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Thu Jun 28 21:33:07 2012
@@ -4,6 +4,7 @@ project (Proton)
 
 set (PN_VERSION_MAJOR 0)
 set (PN_VERSION_MINOR 1)
+set (LINK_DEPS uuid)
 
 configure_file (
   "${PROJECT_SOURCE_DIR}/pn_config.h.in"
@@ -57,7 +58,7 @@ if (SWIG_FOUND)
 endif (SWIG_FOUND)
 
 add_executable (proton src/proton.c)
-target_link_libraries (proton qpidproton)
+target_link_libraries (proton qpidproton ${LINK_DEPS})
 
 add_subdirectory(docs/api)
 

Modified: qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt Thu Jun 28 21:33:07 
2012
@@ -36,5 +36,5 @@ ELSE()
    set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/php.i PROPERTIES 
SWIG_FLAGS "-I${PROJECT_SOURCE_DIR}/include")
    swig_add_module(cproton php ${CMAKE_CURRENT_SOURCE_DIR}/php.i)
    set_source_files_properties(${swig_generated_file_fullname} PROPERTIES 
COMPILE_FLAGS "${PHP_INCLUDES}")
-   swig_link_libraries(cproton qpidproton)
+   swig_link_libraries(cproton qpidproton ${LINK_DEPS})
 ENDIF()

Modified: qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt Thu Jun 28 
21:33:07 2012
@@ -2,4 +2,4 @@ find_package (PythonLibs)
 
 swig_add_module(cproton python python.i)
 include_directories (${PYTHON_INCLUDE_PATH})
-swig_link_libraries(cproton qpidproton ${PYTHON_LIBRARIES})
+swig_link_libraries(cproton qpidproton ${LINK_DEPS} ${PYTHON_LIBRARIES})

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Jun 28 21:33:07 
2012
@@ -31,11 +31,22 @@
 
 typedef struct pn_messenger_t pn_messenger_t; /**< Messenger*/
 
-/** Constructs a new Messenger.
+/** Construct a new Messenger with the given name. The name is global.
+ * If a NULL name is supplied, a UUID based name will be chosen.
+ *
+ * @param[in] name the name of the messenger or NULL
  *
  * @return pointer to a new Messenger
  */
-pn_messenger_t *pn_messenger();
+pn_messenger_t *pn_messenger(const char *name);
+
+/** Retrieves the name of a Messenger.
+ *
+ * @param[in] messenger the messenger
+ *
+ * @return the name of the messenger
+ */
+const char *pn_messenger_name(pn_messenger_t *messenger);
 
 /** Frees a Messenger.
  *

Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Thu Jun 28 21:33:07 2012
@@ -418,6 +418,8 @@ int pn_message_decode(pn_message_t *msg,
     msg->body = pn_data(64);
   }
 
+  pn_data_clear(msg->body);
+
   while (size) {
     size_t copy = size;
     pn_data_clear(msg->data);
@@ -641,7 +643,7 @@ int pn_message_save_data(pn_message_t *m
 {
   if (!msg) return PN_ARG_ERR;
 
-  if (!msg->body) {
+  if (!msg->body || pn_data_size(msg->body) == 0) {
     *size = 0;
     return 0;
   }

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Jun 28 21:33:07 2012
@@ -24,9 +24,11 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
+#include <uuid/uuid.h>
 #include "util.h"
 
 struct pn_messenger_t {
+  char *name;
   pn_driver_t *driver;
   pn_connector_t *connectors[1024];
   size_t size;
@@ -36,11 +38,25 @@ struct pn_messenger_t {
   pn_error_t *error;
 };
 
-pn_messenger_t *pn_messenger()
+char *build_name(const char *name)
+{
+  if (name) {
+    return pn_strdup(name);
+  } else {
+    char *generated = malloc(37*sizeof(char));
+    uuid_t uuid;
+    uuid_generate(uuid);
+    uuid_unparse_lower(uuid, generated);
+    return generated;
+  }
+}
+
+pn_messenger_t *pn_messenger(const char *name)
 {
   pn_messenger_t *m = malloc(sizeof(pn_messenger_t));
 
   if (m) {
+    m->name = build_name(name);
     m->driver = pn_driver();
     m->size = 0;
     m->listeners = 0;
@@ -52,9 +68,15 @@ pn_messenger_t *pn_messenger()
   return m;
 }
 
+const char *pn_messenger_name(pn_messenger_t *messenger)
+{
+  return messenger->name;
+}
+
 void pn_messenger_free(pn_messenger_t *messenger)
 {
   if (messenger) {
+    free(messenger->name);
     pn_driver_free(messenger->driver);
     pn_error_free(messenger->error);
   }
@@ -166,7 +188,7 @@ int pn_messenger_sync(pn_messenger_t *me
       pn_sasl_server(sasl);
       pn_sasl_done(sasl, PN_SASL_OK);
       pn_connection_t *conn = pn_connection();
-      pn_connection_set_container(conn, pn_listener_context(l));
+      pn_connection_set_container(conn, messenger->name);
       pn_connector_set_connection(c, conn);
       messenger->connectors[messenger->size++] = c;
     }
@@ -294,7 +316,7 @@ pn_connection_t *pn_messenger_domain(pn_
   pn_sasl_mechanisms(sasl, "ANONYMOUS");
   pn_sasl_client(sasl);
   pn_connection_t *connection = pn_connection();
-  pn_connection_set_container(connection, domain);
+  pn_connection_set_container(connection, messenger->name);
   pn_connection_set_hostname(connection, domain);
   pn_connection_open(connection);
   pn_connector_set_connection(connector, connection);
@@ -360,7 +382,7 @@ pn_listener_t *pn_messenger_isource(pn_m
   char *name;
   parse_address(buf, &domain, &name);
 
-  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", 
pn_strdup(domain + 1));
+  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", 
NULL);
   if (listener) {
     messenger->listeners++;
   }
@@ -369,7 +391,8 @@ pn_listener_t *pn_messenger_isource(pn_m
 
 int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
 {
-  if (strlen(source) >= 3 && source[0] == '/' && source[1] == '/' && source[2] 
== '~') {
+  int len = strlen(source);
+  if (len >= 3 && source[0] == '/' && source[1] == '/' && source[2] == '~') {
     pn_listener_t *lnr = pn_messenger_isource(messenger, source);
     if (lnr) {
       return 0;
@@ -377,7 +400,7 @@ int pn_messenger_subscribe(pn_messenger_
       return pn_error_format(messenger->error, PN_ERR,
                              "unable to subscribe to source: %s", source);
     }
-  } else {
+  } else if (len >= 2 && source[0] == '/' && source[1] == '/') {
     pn_link_t *src = pn_messenger_source(messenger, source);
     if (src) {
       return 0;
@@ -385,6 +408,23 @@ int pn_messenger_subscribe(pn_messenger_
       return pn_error_format(messenger->error, PN_ERR,
                              "unable to subscribe to source: %s", source);
     }
+  } else {
+    return 0;
+  }
+}
+
+static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
+{
+  const char *address = pn_message_get_reply_to(msg);
+  int len = address ? strlen(address) : 0;
+  if (len > 0 && address[0] != '/') {
+    char buf[len + strlen(mng->name) + 4];
+    sprintf(buf, "//%s/%s", mng->name, address);
+    pn_message_set_reply_to(msg, buf);
+  } else if (len == 0) {
+    char buf[strlen(mng->name) + 4];
+    sprintf(buf, "//%s", mng->name);
+    pn_message_set_reply_to(msg, buf);
   }
 }
 
@@ -392,6 +432,7 @@ int pn_messenger_put(pn_messenger_t *mes
 {
   if (!messenger) return PN_ARG_ERR;
   if (!msg) return pn_error_set(messenger->error, PN_ARG_ERR, "null message");
+  outward_munge(messenger, msg);
   const char *address = pn_message_get_address(msg);
   pn_link_t *sender = pn_messenger_target(messenger, address);
   if (!sender)
@@ -505,11 +546,12 @@ int pn_messenger_get(pn_messenger_t *mes
         pn_settle(d);
         if (n < 0) return n;
         int err = pn_message_decode(msg, buf, n);
-        if (err)
+        if (err) {
           return pn_error_format(messenger->error, err, "error decoding 
message: %s",
                                  pn_message_error(msg));
-        else
+        } else {
           return 0;
+        }
       }
       d = pn_work_next(d);
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to