Author: rhs
Date: Thu Jun 28 21:33:07 2012
New Revision: 1355166
URL: http://svn.apache.org/viewvc?rev=1355166&view=rev
Log:
added names to messengers; fixed addressing a bit; added server/client example
Added:
qpid/proton/trunk/examples/messenger/client.py (with props)
qpid/proton/trunk/examples/messenger/server.py (with props)
Modified:
qpid/proton/trunk/examples/messenger/recv.py
qpid/proton/trunk/examples/messenger/send.py
qpid/proton/trunk/proton-c/CMakeLists.txt
qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt
qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/proton-c/src/messenger.c
Added: qpid/proton/trunk/examples/messenger/client.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/client.py?rev=1355166&view=auto
==============================================================================
--- qpid/proton/trunk/examples/messenger/client.py (added)
+++ qpid/proton/trunk/examples/messenger/client.py Thu Jun 28 21:33:07 2012
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys, optparse
+from xproton import *
+
+parser = optparse.OptionParser(usage="usage: %prog <addr> <subject>",
+ description="simple message server")
+
+parser.add_option("-r", "--reply_to", default="replies",
+ help="address: //<domain>[/<name>] (default %default)")
+
+opts, args = parser.parse_args()
+
+if len(args) != 2:
+ parser.error("incorrect number of arguments")
+
+address, subject = args
+
+mng = pn_messenger(None)
+pn_messenger_start(mng)
+
+msg = pn_message()
+pn_message_set_address(msg, address)
+pn_message_set_subject(msg, subject)
+pn_message_set_reply_to(msg, opts.reply_to)
+
+if pn_messenger_put(mng, msg): print pn_messenger_error(mng)
+if pn_messenger_send(mng): print pn_messenger_error(mng)
+
+if opts.reply_to[:2] != "//":
+ if pn_messenger_recv(mng, 1): print pn_messenger_error(mng)
+ elif pn_messenger_get(mng, msg): print pn_messenger_error(mng)
+ else: print pn_message_get_address(msg), pn_message_get_subject(msg)
+
+pn_messenger_stop(mng)
+pn_messenger_free(mng)
Propchange: qpid/proton/trunk/examples/messenger/client.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/proton/trunk/examples/messenger/recv.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Thu Jun 28 21:33:07 2012
@@ -28,7 +28,7 @@ opts, args = parser.parse_args()
if not args:
args = ["//~0.0.0.0"]
-mng = pn_messenger()
+mng = pn_messenger(None)
pn_messenger_start(mng)
for a in args:
@@ -46,9 +46,12 @@ while True:
print pn_messenger_error(mng)
else:
cd, body = pn_message_save(msg, 1024)
- print pn_message_get_address(msg), \
- pn_message_get_subject(msg) or "(no subject)", \
- body
+ if cd:
+ print pn_message_error(msg)
+ else:
+ print pn_message_get_address(msg), \
+ pn_message_get_subject(msg) or "(no subject)", \
+ body
pn_messenger_stop(mng)
pn_messenger_free(mng)
Modified: qpid/proton/trunk/examples/messenger/send.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Thu Jun 28 21:33:07 2012
@@ -29,7 +29,7 @@ opts, args = parser.parse_args()
if not args:
args = ["Hello World!"]
-mng = pn_messenger()
+mng = pn_messenger(None)
pn_messenger_start(mng)
msg = pn_message()
Added: qpid/proton/trunk/examples/messenger/server.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/server.py?rev=1355166&view=auto
==============================================================================
--- qpid/proton/trunk/examples/messenger/server.py (added)
+++ qpid/proton/trunk/examples/messenger/server.py Thu Jun 28 21:33:07 2012
@@ -0,0 +1,70 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys, optparse
+from xproton import *
+
+parser = optparse.OptionParser(usage="usage: %prog <addr_1> ... <addr_n>",
+ description="simple message server")
+
+opts, args = parser.parse_args()
+
+if not args:
+ args = ["//~0.0.0.0"]
+
+mng = pn_messenger(None)
+print pn_messenger_name(mng)
+pn_messenger_start(mng)
+
+for a in args:
+ if pn_messenger_subscribe(mng, a):
+ print pn_messenger_error(mng)
+ break
+
+def dispatch(request, response):
+ subject = pn_message_get_subject(request)
+ pn_message_set_subject(response, "Re: %s" % subject)
+ print "Dispatched %s" % subject
+
+msg = pn_message()
+reply = pn_message()
+
+while True:
+ if pn_messenger_incoming(mng) < 10:
+ if pn_messenger_recv(mng, 10):
+ print pn_messenger_error(mng)
+ break
+ if pn_messenger_incoming(mng) > 0:
+ if pn_messenger_get(mng, msg):
+ print pn_messenger_error(mng)
+ else:
+ reply_to = pn_message_get_reply_to(msg)
+ cid = pn_message_get_correlation_id(msg)
+ if reply_to:
+ pn_message_set_address(reply, reply_to)
+ if cid:
+ pn_message_set_correlation_id(reply, cid)
+ dispatch(msg, reply)
+ if pn_messenger_put(mng, reply):
+ print pn_messenger_error(mng)
+ if pn_messenger_send(mng):
+ print pn_messenger_error(mng)
+
+pn_messenger_stop(mng)
+pn_messenger_free(mng)
Propchange: qpid/proton/trunk/examples/messenger/server.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Thu Jun 28 21:33:07 2012
@@ -4,6 +4,7 @@ project (Proton)
set (PN_VERSION_MAJOR 0)
set (PN_VERSION_MINOR 1)
+set (LINK_DEPS uuid)
configure_file (
"${PROJECT_SOURCE_DIR}/pn_config.h.in"
@@ -57,7 +58,7 @@ if (SWIG_FOUND)
endif (SWIG_FOUND)
add_executable (proton src/proton.c)
-target_link_libraries (proton qpidproton)
+target_link_libraries (proton qpidproton ${LINK_DEPS})
add_subdirectory(docs/api)
Modified: qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/bindings/php/CMakeLists.txt Thu Jun 28 21:33:07
2012
@@ -36,5 +36,5 @@ ELSE()
set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/php.i PROPERTIES
SWIG_FLAGS "-I${PROJECT_SOURCE_DIR}/include")
swig_add_module(cproton php ${CMAKE_CURRENT_SOURCE_DIR}/php.i)
set_source_files_properties(${swig_generated_file_fullname} PROPERTIES
COMPILE_FLAGS "${PHP_INCLUDES}")
- swig_link_libraries(cproton qpidproton)
+ swig_link_libraries(cproton qpidproton ${LINK_DEPS})
ENDIF()
Modified: qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/bindings/python/CMakeLists.txt Thu Jun 28
21:33:07 2012
@@ -2,4 +2,4 @@ find_package (PythonLibs)
swig_add_module(cproton python python.i)
include_directories (${PYTHON_INCLUDE_PATH})
-swig_link_libraries(cproton qpidproton ${PYTHON_LIBRARIES})
+swig_link_libraries(cproton qpidproton ${LINK_DEPS} ${PYTHON_LIBRARIES})
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Jun 28 21:33:07
2012
@@ -31,11 +31,22 @@
typedef struct pn_messenger_t pn_messenger_t; /**< Messenger*/
-/** Constructs a new Messenger.
+/** Construct a new Messenger with the given name. The name is global.
+ * If a NULL name is supplied, a UUID based name will be chosen.
+ *
+ * @param[in] name the name of the messenger or NULL
*
* @return pointer to a new Messenger
*/
-pn_messenger_t *pn_messenger();
+pn_messenger_t *pn_messenger(const char *name);
+
+/** Retrieves the name of a Messenger.
+ *
+ * @param[in] messenger the messenger
+ *
+ * @return the name of the messenger
+ */
+const char *pn_messenger_name(pn_messenger_t *messenger);
/** Frees a Messenger.
*
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Thu Jun 28 21:33:07 2012
@@ -418,6 +418,8 @@ int pn_message_decode(pn_message_t *msg,
msg->body = pn_data(64);
}
+ pn_data_clear(msg->body);
+
while (size) {
size_t copy = size;
pn_data_clear(msg->data);
@@ -641,7 +643,7 @@ int pn_message_save_data(pn_message_t *m
{
if (!msg) return PN_ARG_ERR;
- if (!msg->body) {
+ if (!msg->body || pn_data_size(msg->body) == 0) {
*size = 0;
return 0;
}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1355166&r1=1355165&r2=1355166&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Jun 28 21:33:07 2012
@@ -24,9 +24,11 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
+#include <uuid/uuid.h>
#include "util.h"
struct pn_messenger_t {
+ char *name;
pn_driver_t *driver;
pn_connector_t *connectors[1024];
size_t size;
@@ -36,11 +38,25 @@ struct pn_messenger_t {
pn_error_t *error;
};
-pn_messenger_t *pn_messenger()
+char *build_name(const char *name)
+{
+ if (name) {
+ return pn_strdup(name);
+ } else {
+ char *generated = malloc(37*sizeof(char));
+ uuid_t uuid;
+ uuid_generate(uuid);
+ uuid_unparse_lower(uuid, generated);
+ return generated;
+ }
+}
+
+pn_messenger_t *pn_messenger(const char *name)
{
pn_messenger_t *m = malloc(sizeof(pn_messenger_t));
if (m) {
+ m->name = build_name(name);
m->driver = pn_driver();
m->size = 0;
m->listeners = 0;
@@ -52,9 +68,15 @@ pn_messenger_t *pn_messenger()
return m;
}
+const char *pn_messenger_name(pn_messenger_t *messenger)
+{
+ return messenger->name;
+}
+
void pn_messenger_free(pn_messenger_t *messenger)
{
if (messenger) {
+ free(messenger->name);
pn_driver_free(messenger->driver);
pn_error_free(messenger->error);
}
@@ -166,7 +188,7 @@ int pn_messenger_sync(pn_messenger_t *me
pn_sasl_server(sasl);
pn_sasl_done(sasl, PN_SASL_OK);
pn_connection_t *conn = pn_connection();
- pn_connection_set_container(conn, pn_listener_context(l));
+ pn_connection_set_container(conn, messenger->name);
pn_connector_set_connection(c, conn);
messenger->connectors[messenger->size++] = c;
}
@@ -294,7 +316,7 @@ pn_connection_t *pn_messenger_domain(pn_
pn_sasl_mechanisms(sasl, "ANONYMOUS");
pn_sasl_client(sasl);
pn_connection_t *connection = pn_connection();
- pn_connection_set_container(connection, domain);
+ pn_connection_set_container(connection, messenger->name);
pn_connection_set_hostname(connection, domain);
pn_connection_open(connection);
pn_connector_set_connection(connector, connection);
@@ -360,7 +382,7 @@ pn_listener_t *pn_messenger_isource(pn_m
char *name;
parse_address(buf, &domain, &name);
- pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672",
pn_strdup(domain + 1));
+ pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672",
NULL);
if (listener) {
messenger->listeners++;
}
@@ -369,7 +391,8 @@ pn_listener_t *pn_messenger_isource(pn_m
int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
{
- if (strlen(source) >= 3 && source[0] == '/' && source[1] == '/' && source[2]
== '~') {
+ int len = strlen(source);
+ if (len >= 3 && source[0] == '/' && source[1] == '/' && source[2] == '~') {
pn_listener_t *lnr = pn_messenger_isource(messenger, source);
if (lnr) {
return 0;
@@ -377,7 +400,7 @@ int pn_messenger_subscribe(pn_messenger_
return pn_error_format(messenger->error, PN_ERR,
"unable to subscribe to source: %s", source);
}
- } else {
+ } else if (len >= 2 && source[0] == '/' && source[1] == '/') {
pn_link_t *src = pn_messenger_source(messenger, source);
if (src) {
return 0;
@@ -385,6 +408,23 @@ int pn_messenger_subscribe(pn_messenger_
return pn_error_format(messenger->error, PN_ERR,
"unable to subscribe to source: %s", source);
}
+ } else {
+ return 0;
+ }
+}
+
+static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
+{
+ const char *address = pn_message_get_reply_to(msg);
+ int len = address ? strlen(address) : 0;
+ if (len > 0 && address[0] != '/') {
+ char buf[len + strlen(mng->name) + 4];
+ sprintf(buf, "//%s/%s", mng->name, address);
+ pn_message_set_reply_to(msg, buf);
+ } else if (len == 0) {
+ char buf[strlen(mng->name) + 4];
+ sprintf(buf, "//%s", mng->name);
+ pn_message_set_reply_to(msg, buf);
}
}
@@ -392,6 +432,7 @@ int pn_messenger_put(pn_messenger_t *mes
{
if (!messenger) return PN_ARG_ERR;
if (!msg) return pn_error_set(messenger->error, PN_ARG_ERR, "null message");
+ outward_munge(messenger, msg);
const char *address = pn_message_get_address(msg);
pn_link_t *sender = pn_messenger_target(messenger, address);
if (!sender)
@@ -505,11 +546,12 @@ int pn_messenger_get(pn_messenger_t *mes
pn_settle(d);
if (n < 0) return n;
int err = pn_message_decode(msg, buf, n);
- if (err)
+ if (err) {
return pn_error_format(messenger->error, err, "error decoding
message: %s",
pn_message_error(msg));
- else
+ } else {
return 0;
+ }
}
d = pn_work_next(d);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]