Author: aconway
Date: Fri Jun 6 00:31:33 2014
New Revision: 1600798
URL: http://svn.apache.org/r1600798
Log:
DISPATCH-56: Initial (query only) generic AMQP management client in python.
python/qpid_dispatch_internal.amqp.py: Generic client to an AMQP management
node.
Currently supports QUERY operations only.
tests/system_tests_management.py: system tests for management client.
src/agent.c: Send error responses for more error conditions. Log management
errors.
Python test fixes to work with python 2.6.
TODO: specific validating client for qdrouter nodes, using qdrouter schema.
Added:
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
(with props)
qpid/dispatch/trunk/tests/management/amqp.py (with props)
qpid/dispatch/trunk/tests/system_tests_management.py (with props)
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
qpid/dispatch/trunk/include/qpid/dispatch/error.h
qpid/dispatch/trunk/src/agent.c
qpid/dispatch/trunk/src/amqp.c
qpid/dispatch/trunk/src/error.c
qpid/dispatch/trunk/src/log.c
qpid/dispatch/trunk/src/log_private.h
qpid/dispatch/trunk/src/message.c
qpid/dispatch/trunk/tests/CMakeLists.txt
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_broker.py
qpid/dispatch/trunk/tests/system_tests_one_router.py
qpid/dispatch/trunk/tools/qdtest.in
Modified: qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/amqp.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/amqp.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/amqp.h Fri Jun 6 00:31:33 2014
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,7 @@
*/
/** @defgroup amqp AMQP
- *
+ *
* AMQP related constant definitions.
*/
/// @{
@@ -105,7 +105,15 @@ const char * const QD_INTERNODE_LINK_NAM
const char * const QD_INTERNODE_LINK_NAME_2;
/// @}
+/** @name AMQP error codes. */
+/// @{
+typedef struct qd_amqp_error_t { int status; const char* description; }
qd_amqp_error_t;
+extern const qd_amqp_error_t QD_AMQP_OK;
+extern const qd_amqp_error_t QD_AMQP_BAD_REQUEST;
+extern const qd_amqp_error_t QD_AMQP_NOT_FOUND;
+extern const qd_amqp_error_t QD_AMQP_NOT_IMPLEMENTED;
/// @}
-#endif
+/// @}
+#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/error.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/error.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/error.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/error.h Fri Jun 6 00:31:33 2014
@@ -67,4 +67,6 @@ const char* qd_error_message();
*/
qd_error_t qd_error_code();
+/** Maximum length of a qd_error_message, useful for temporary buffers. */
+extern const int QD_ERROR_MAX;
#endif
Added: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py (added)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py Fri
Jun 6 00:31:33 2014
@@ -0,0 +1,260 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""
+AMQP management tools for Qpid dispatch.
+"""
+
+import proton, re, threading, httplib
+from collections import namedtuple
+
+class Error(Exception): pass
+
+class ManagementError(Exception):
+ """An AMQP management error. str() gives a string with status code and
text.
+ @ivar status: integer status code
+ @ivar description: description text
+ """
+ def __init__(self, status, description):
+ self.status, self.description = status, description
+
+ def __str__(self):
+ status_str = httplib.responses.get(self.status)
+ if status_str in self.description: return self.description
+ else: return "%s: %s"%(self.status, self.description)
+
+ @staticmethod
+ def check(status, response):
+ if status != httplib.OK:
+ raise ManagementError(status, response)
+
+
+# FIXME aconway 2014-06-03: proton URL class, conditional import?
+class Url:
+ """Simple AMQP URL parser/constructor"""
+
+ RE = re.compile(r"""
+ # [ <scheme>:// ] [ <user> [ : <password> ] @] ( <host4>
| \[ <host6> \] ) [ :<port> ] [ / path]
+ ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: : ([^:/@]+) )? @)? (?: ([^@:/\[]+)
| \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))? (?: / (.*))? $
+""", re.X | re.I)
+ AMQPS = "amqps"
+ AMQP = "amqp"
+
+ def __init__(self, s=None, **kwargs):
+ """
+ @param s: String value to convert to URL
+ @param kwargs: URL components: scheme, user, password, host, port,
path.
+ """
+ if s is None:
+ self.scheme = kwargs.get('scheme')
+ self.user = kwargs.get('user')
+ self.password = kwargs.get('password')
+ self.host = kwargs.get('host')
+ self.port = kwargs.get('port')
+ if self.host is None:
+ raise ValueError('Host required for url')
+ self.path = kwargs.get('path')
+ elif isinstance(s, Url):
+ self.scheme = s.scheme
+ self.user = s.user
+ self.password = s.password
+ self.host = s.host
+ self.port = s.port
+ self.path = s.path
+ else:
+ match = Url.RE.match(s)
+ if match is None:
+ raise ValueError(s)
+ scheme, self.user, self.password, host4, host6, port, self.path =
match.groups()
+ self.host = host4 or host6
+ self.port = port and int(port)
+ self.scheme = scheme
+
+ def __repr__(self):
+ return "Url(%r)" % str(self)
+
+ def __str__(self):
+ s = ""
+ if self.scheme:
+ s += "%s://" % self.scheme
+ if self.user:
+ s += self.user
+ if self.password:
+ s += ":%s@" % self.password
+ if ':' not in self.host:
+ s += self.host
+ else:
+ s += "[%s]" % self.host
+ if self.port:
+ s += ":%s" % self.port
+ if self.path:
+ s += "/%s" % self.path
+ return s
+
+ def __eq__(self, url):
+ if isinstance(url, basestring):
+ url = Url(url)
+ return \
+ self.scheme==url.scheme and \
+ self.user==url.user and self.password==url.password and \
+ self.host==url.host and self.port==url.port and \
+ self.path==url.path
+
+ def __ne__(self, url):
+ return not self.__eq__(url)
+
+ def defaults(self):
+ """"Fill in defaults for scheme and port if missing """
+ if not self.scheme: self.scheme = self.AMQP
+ if not self.port:
+ if self.scheme==self.AMQP: self.port = 5672
+ elif self.scheme==self.AMQPS: self.port = 5671
+ else: raise ValueError("Invalid URL scheme: %s"%self.scheme)
+ return self
+
+def remove_none(d):
+ """
+ Remove any None values from a dictionary. Does not modify d.
+ """
+ return dict((k, v) for k, v in d.iteritems() if v is not None)
+
+class Node(object):
+
+ SELF='self' # AMQP management node name
+ NODE_TYPE='org.amqp.management' # AMQP management node type
+ NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
+
+ def __init__(self, address, router=None, locales=None):
+ """
+ @param address: AMQP address of the management node.
+ @param router: If address does not contain a path, use the management
node for this router ID.
+ If not specified and address does not contain a path, use the
default management node.
+ @param locales: Default list of locales for management operations.
+ """
+ self.address = Url(address).defaults()
+ self.locales = locales
+ if self.address.path is None:
+ if router:
+ self.address.path = '_topo/0/%s/$management' % router
+ else:
+ self.address.path = '$management'
+ self.responses = {}
+
+ self.messenger = proton.Messenger()
+ self.messenger.start()
+ self.messenger.timeout = 1 # FIXME aconway 2014-06-02: config
+ subscribe_address = Url(address)
+ subscribe_address.path = "#"
+ self.subscription = self.messenger.subscribe(str(subscribe_address))
+ self._flush()
+ self.reply_to = self.subscription.address
+
+ def stop(self):
+ if not self.messenger: return
+ self.messenger.stop()
+ self.messenger = None
+
+ def __del__(self):
+ self.stop()
+
+ def _flush(self):
+ """Call self.messenger.work() till there is no work left."""
+ while self.messenger.work(0.01):
+ pass
+
+ CORRELATION_ID = 0
+ CORRELATION_LOCK = threading.Lock
+
+ def correlation_id(self):
+ """Get the next correlation ID. Thread safe."""
+ self.CORRELATION_ID += 1
+ return self.CORRELATION_ID
+
+ def check_response(self, response, correlation_id=None):
+ """
+ Check a manaement response message for errors and correlation ID.
+ """
+ properties = response.properties
+ ManagementError.check(properties.get('statusCode'),
properties.get('statusDescription'))
+ if correlation_id is not None and response.correlation_id !=
correlation_id:
+ raise ManagementError("Bad correlation id request=%s,
response=%s"%(
+ correlation_id, response.correlation_id))
+
+
+ def request(self, body=None, **properties):
+ """
+ Make a L{proton.Message} containining a management request.
+ @param body: The request body, a dict or list.
+ @param properties: Map of application-properties for the request.
+ @return: L{proton.Message} containining the management request.
+ """
+ if self.locales: properties.set_default(self.locales)
+ request = proton.Message()
+ request.address=str(self.address)
+ request.reply_to=self.reply_to
+ request.correlation_id=self.correlation_id()
+ request.properties=remove_none(properties)
+ request.body=remove_none(body or {})
+ return request
+
+ def node_request(self, body={}, **properties):
+ return self.request(body, name=self.SELF, type=self.NODE_TYPE,
**properties)
+
+ # TODO aconway 2014-06-03: async send/receive
+ def call(self, request):
+ """
+ Send a management request message, wait for a response.
+ @return: Response message
+ """
+ if not request.address:
+ raise ValueError("Message must have an address")
+ if not request.reply_to:
+ raise ValueError("Message must have reply_to")
+ self.messenger.put(request)
+ self.messenger.send()
+ self._flush()
+ self.messenger.recv(1)
+ response = proton.Message()
+ self.messenger.get(response)
+ self.check_response(response)
+ return response
+
+ class QueryResult(namedtuple('QueryResult', ['attribute_names',
'results'])):
+ """
+ Result returned by L{query}
+ @ivar attribute_names: List of attribute names for the results.
+ @ivar results: List of lists. Each entry is a list of attribute values
+ corresponding to the attribute_names.
+ """
+ pass
+
+ def query(self, entity_type=None, attribute_names=None, offset=None,
count=None):
+ """
+ Send an AMQP management query message and return the response.
+ At least one of entity_type, attribute_names must be specified.
+ @keyword entity_type: The type of entity to query.
+ @keyword attribute_names: A list of attribute names to query.
+ @keyword offset: An integer offset into the list of results to return.
+ @keyword count: A count of the maximum number of results to return.
+ @return: A L{QueryResult}
+ """
+ response = self.call(self.node_request(
+ operation='QUERY', entityType=entity_type, offset=offset,
count=count,
+ body={'attributeNames':attribute_names or []}))
+ return Node.QueryResult(response.body['attributeNames'],
response.body['results'])
Propchange: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/src/agent.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Fri Jun 6 00:31:33 2014
@@ -85,7 +85,6 @@ typedef enum {
// Convenience for logging, expects agent to be defined.
#define LOG(LEVEL, MSG, ...) qd_log(agent->log_source, QD_LOG_##LEVEL, MSG,
##__VA_ARGS__)
-
static const char *AGENT_ADDRESS = "$management";
static const char *STATUS_CODE = "statusCode";
static const char *STATUS_DESCRIPTION = "statusDescription";
@@ -104,12 +103,10 @@ static const char *OP_GET_MGMT_NODES =
static const char *BODY_ATTR_NAMES = "attributeNames";
static const char *BODY_RESULTS = "results";
-static const char *BAD_REQUEST = "Bad Request";
-static const char *BAD_REQUEST_NEED_BODY_MAP = "Bad Request - Expected map in
body of the request";
-static const char *BAD_REQUEST_NEED_ALIST = "Bad Request - Expected
attributeList in body map";
-static const char *BAD_REQUEST_ALL_DEFAULT = "Bad Request - At least one of
entityType or attributeNames must be provided";
-static const char *NOT_IMPLEMENTED = "Not Implemented";
-static const char *NOT_FOUND = "Not Found";
+static const char *INVALID_ENTITY_TYPE="Invalid entityType";
+static const char *NEED_BODY_MAP = "Expected map in body of the request";
+static const char *NEED_ALIST = "Expected attributeNames in body map";
+static const char *ALL_DEFAULT = "At least one of entityType or
attributeNames must be provided";
#define ATTR_ABSENT 1000000
#define ATTR_TYPE 1000001
@@ -141,10 +138,10 @@ static qd_composed_field_t *qd_agent_set
field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
qd_compose_start_map(field);
qd_compose_insert_string(field, STATUS_CODE);
- qd_compose_insert_uint(field, 200);
+ qd_compose_insert_uint(field, QD_AMQP_OK.status);
qd_compose_insert_string(field, STATUS_DESCRIPTION);
- qd_compose_insert_string(field, "OK");
+ qd_compose_insert_string(field, QD_AMQP_OK.description);
if (close_ap)
qd_compose_end_map(field);
@@ -153,7 +150,9 @@ static qd_composed_field_t *qd_agent_set
}
-static qd_composed_field_t *qd_agent_setup_error(qd_field_iterator_t
*reply_to, qd_field_iterator_t *cid, uint32_t code, const char *text)
+static qd_composed_field_t *qd_agent_setup_error(
+ qd_agent_t* agent, qd_field_iterator_t *reply_to, qd_field_iterator_t *cid,
+ qd_amqp_error_t error, const char *text)
{
qd_composed_field_t *field = 0;
@@ -179,10 +178,13 @@ static qd_composed_field_t *qd_agent_set
field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
qd_compose_start_map(field);
qd_compose_insert_string(field, STATUS_CODE);
- qd_compose_insert_uint(field, code);
+ qd_compose_insert_uint(field, error.status);
qd_compose_insert_string(field, STATUS_DESCRIPTION);
- qd_compose_insert_string(field, text);
+ char msg[QD_ERROR_MAX];
+ snprintf(msg, sizeof(msg), "%s: %s", error.description, text);
+ LOG(ERROR, "Management error: %s", msg);
+ qd_compose_insert_string(field, msg);
qd_compose_end_map(field);
return field;
@@ -205,6 +207,13 @@ static void qd_agent_send_response(qd_ag
qd_compose_free(field2);
}
+static void qd_agent_send_error(
+ qd_agent_t *agent, qd_field_iterator_t *reply_to, qd_field_iterator_t *cid,
+ qd_amqp_error_t code, const char *text)
+{
+ qd_agent_send_response(agent, qd_agent_setup_error(agent, reply_to, cid,
code, text),
+ 0, reply_to);
+}
static void qd_agent_insert_attr_names(qd_composed_field_t *field,
const qd_agent_class_t *cls,
@@ -345,7 +354,7 @@ static void qd_agent_process_object_quer
// The body of the request must be present
//
if (!body) {
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid,
400, BAD_REQUEST_NEED_BODY_MAP), 0, reply_to);
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
NEED_BODY_MAP);
break;
}
@@ -354,7 +363,7 @@ static void qd_agent_process_object_quer
//
body_map = qd_parse(body);
if (!body_map || !qd_parse_ok(body_map) || !qd_parse_is_map(body_map))
{
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid,
400, BAD_REQUEST_NEED_BODY_MAP), 0, reply_to);
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
NEED_BODY_MAP);
break;
}
@@ -364,7 +373,7 @@ static void qd_agent_process_object_quer
//
attr_list = qd_parse_value_by_key(body_map, BODY_ATTR_NAMES);
if (!attr_list || !qd_parse_ok(attr_list) ||
!qd_parse_is_list(attr_list)) {
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid,
400, BAD_REQUEST_NEED_ALIST), 0, reply_to);
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
NEED_ALIST);
break;
}
@@ -375,7 +384,7 @@ static void qd_agent_process_object_quer
// specified in the request.
//
if (qd_parse_sub_count(attr_list) == 0 && !etype_field) {
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid,
400, BAD_REQUEST_ALL_DEFAULT), 0, reply_to);
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
ALL_DEFAULT);
break;
}
@@ -392,7 +401,9 @@ static void qd_agent_process_object_quer
// If the entityType was specified but not found, return an error.
//
if (cls_record == 0) {
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to,
cid, 404, NOT_FOUND), 0, reply_to);
+ char entity[QD_ERROR_MAX];
+ qd_field_iterator_ncopy(cls_string, (unsigned char*)entity,
sizeof(entity));
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_NOT_FOUND,
entity);
break;
}
}
@@ -504,7 +515,7 @@ static void qd_agent_process_agent_query
qd_composed_field_t *field = 0;
if (etype && !qd_parse_is_scalar(etype))
- field = qd_agent_setup_error(reply_to, cid, 400, BAD_REQUEST);
+ field = qd_agent_setup_error(agent, reply_to, cid,
QD_AMQP_BAD_REQUEST, INVALID_ENTITY_TYPE);
else {
field = qd_agent_setup_response(reply_to, cid, true);
@@ -596,35 +607,41 @@ static void qd_agent_process_request(qd_
// Parse the message through the body and exit if the message is not well
formed.
//
if (!qd_message_check(msg, QD_DEPTH_BODY)) {
- LOG(ERROR, "Bad request: %s", qd_error_message());
+ LOG(ERROR, "Cannot parse request: %s", qd_error_message());
return;
}
//
+ // Get an iterator for the reply-to. Exit if not found.
+ //
+ qd_field_iterator_t *reply_to = qd_message_field_iterator(msg,
QD_FIELD_REPLY_TO);
+ if (reply_to == 0) {
+ LOG(ERROR, "Reqeust has no reply-to");
+ return;
+ }
+
+ //
+ // Get an iterator for the correlation_id.
+ //
+ qd_field_iterator_t *cid = qd_message_field_iterator_typed(msg,
QD_FIELD_CORRELATION_ID);
+
+ //
// Get an iterator for the application-properties. Exit if the message
has none.
//
qd_field_iterator_t *ap = qd_message_field_iterator(msg,
QD_FIELD_APPLICATION_PROPERTIES);
if (ap == 0) {
- LOG(ERROR, "Bad request: no application-properties");
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, "No
application-properties");
return;
}
//
- // Get an iterator for the reply-to. Exit if not found.
- //
- qd_field_iterator_t *reply_to = qd_message_field_iterator(msg,
QD_FIELD_REPLY_TO);
- if (reply_to == 0) {
- LOG(ERROR, "Bad request: no reply-to");
- return;
- }
- //
// Try to get a map-view of the application-properties.
//
qd_parsed_field_t *map = qd_parse(ap);
if (map == 0) {
qd_field_iterator_free(ap);
qd_field_iterator_free(reply_to);
- LOG(ERROR, "Bad request: application-properties not a map");
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
"Application-properties not a map");
return;
}
@@ -635,7 +652,7 @@ static void qd_agent_process_request(qd_
qd_field_iterator_free(ap);
qd_field_iterator_free(reply_to);
qd_parse_free(map);
- LOG(ERROR, "Bad request: application-properties not a map");
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST,
"Application-properties not a map");
return;
}
@@ -647,15 +664,11 @@ static void qd_agent_process_request(qd_
qd_parse_free(map);
qd_field_iterator_free(ap);
qd_field_iterator_free(reply_to);
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, "No
operation");
return;
}
//
- // Get an iterator for the correlation_id.
- //
- qd_field_iterator_t *cid = qd_message_field_iterator_typed(msg,
QD_FIELD_CORRELATION_ID);
-
- //
// Dispatch the operation to the appropriate handler
//
qd_field_iterator_t *operation_string = qd_parse_raw(operation);
@@ -673,8 +686,12 @@ static void qd_agent_process_request(qd_
qd_agent_process_agent_query(agent, map, reply_to, cid,
QD_DISCOVER_OPERATIONS);
else if (qd_field_iterator_equal(operation_string, (unsigned char*)
OP_GET_MGMT_NODES))
qd_agent_process_discover_nodes(agent, map, reply_to, cid);
- else
- qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 501,
NOT_IMPLEMENTED), 0, reply_to);
+ else {
+ char op[QD_ERROR_MAX];
+ int i = qd_field_iterator_ncopy(operation_string, (unsigned char*)op,
sizeof(op)-1);
+ op[i] = '\0';
+ qd_agent_send_error(agent, reply_to, cid, QD_AMQP_NOT_IMPLEMENTED, op);
+ }
qd_parse_free(map);
qd_field_iterator_free(ap);
Modified: qpid/dispatch/trunk/src/amqp.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/amqp.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/amqp.c (original)
+++ qpid/dispatch/trunk/src/amqp.c Fri Jun 6 00:31:33 2014
@@ -29,3 +29,7 @@ const char * const QD_CAPABILITY_ROUTER
const char * const QD_INTERNODE_LINK_NAME_1 = "qd.internode.1";
const char * const QD_INTERNODE_LINK_NAME_2 = "qd.internode.2";
+const qd_amqp_error_t QD_AMQP_OK = { 200, "OK" };
+const qd_amqp_error_t QD_AMQP_BAD_REQUEST = { 400, "Bad Request" };
+const qd_amqp_error_t QD_AMQP_NOT_FOUND = { 404, "Not Found" };
+const qd_amqp_error_t QD_AMQP_NOT_IMPLEMENTED = { 501, "Not Implemented"};
Modified: qpid/dispatch/trunk/src/error.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/error.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/error.c (original)
+++ qpid/dispatch/trunk/src/error.c Fri Jun 6 00:31:33 2014
@@ -22,6 +22,7 @@
#include <stdarg.h>
#include <stdio.h>
#include "static_assert.h"
+#include "log_private.h"
static const char *error_names[] = {
"No Error",
@@ -33,8 +34,8 @@ static const char *error_names[] = {
STATIC_ASSERT(sizeof(error_names)/sizeof(error_names[0]) == QD_ERROR_COUNT,
error_names_wrong_size);
-#define ERROR_MAX 512
-
+#define ERROR_MAX QD_LOG_TEXT_MAX
+const int QD_ERROR_MAX = ERROR_MAX;
static __thread char error_message[ERROR_MAX];
static __thread qd_error_t error_code = 0;
static qd_log_source_t* log_source = 0;
Modified: qpid/dispatch/trunk/src/log.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log.c (original)
+++ qpid/dispatch/trunk/src/log.c Fri Jun 6 00:31:33 2014
@@ -30,7 +30,7 @@
#include <time.h>
#include <syslog.h>
-#define TEXT_MAX 512
+#define TEXT_MAX QD_LOG_TEXT_MAX
#define LIST_MAX 1000
#define LOG_MAX 640
@@ -89,7 +89,7 @@ static log_sink_t* find_log_sink_lh(cons
static log_sink_t* log_sink_lh(const char* name) {
log_sink_t* sink = find_log_sink_lh(name);
- if (sink)
+ if (sink)
sink->refcount++;
else {
sink = NEW(log_sink_t);
@@ -362,4 +362,3 @@ void qd_log_configure(const qd_dispatch_
}
qd_log(logging_log_source, QD_LOG_INFO, "Logging system configured");
}
-
Modified: qpid/dispatch/trunk/src/log_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log_private.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log_private.h (original)
+++ qpid/dispatch/trunk/src/log_private.h Fri Jun 6 00:31:33 2014
@@ -23,5 +23,5 @@
void qd_log_initialize(void);
void qd_log_finalize(void);
-
+#define QD_LOG_TEXT_MAX 512
#endif
Modified: qpid/dispatch/trunk/src/message.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Fri Jun 6 00:31:33 2014
@@ -101,12 +101,11 @@ static const char REPR_END[] = "}\0";
/* TODO aconway 2014-05-13: more detailed message representation. */
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len) {
+ qd_message_check(msg, QD_DEPTH_BODY);
int i = 0;
len -= sizeof(REPR_END); /* Save space for ending */
i += snprintf(buffer+i, len-i, "Message(%p){", msg);
- if (!qd_message_check(msg, QD_DEPTH_BODY))
- i += snprintf(buffer+i, len-i, "<%s>", qd_error_message());
- i += copy_field(msg, QD_FIELD_TO, INT_MAX, " to='", "'", buffer+i, len-i);
+ i += copy_field(msg, QD_FIELD_TO, INT_MAX, "to='", "'", buffer+i, len-i);
i += copy_field(msg, QD_FIELD_REPLY_TO, INT_MAX, " reply-to='", "'",
buffer+i, len-i);
i += copy_field(msg, QD_FIELD_BODY, 16, " body='", "'", buffer+i, len-i);
assert(i <= len);
Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Fri Jun 6 00:31:33 2014
@@ -54,7 +54,7 @@ add_test(unit_tests unit_test
add_test(router_tests ${PYTHON_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v)
add_test(management_tests ${PYTHON_EXECUTABLE}
${CMAKE_BINARY_DIR}/build_env.py ${PYTHON_EXECUTABLE} -m unittest -v management)
-set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py
system_tests_two_routers.py system_tests_broker.py)
+set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py
system_tests_two_routers.py system_tests_broker.py system_tests_management.py)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_build.sh.in
${CMAKE_CURRENT_BINARY_DIR}/config_build.sh)
Added: qpid/dispatch/trunk/tests/management/amqp.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/management/amqp.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/management/amqp.py (added)
+++ qpid/dispatch/trunk/tests/management/amqp.py Fri Jun 6 00:31:33 2014
@@ -0,0 +1,63 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""Tests for management.amqp"""
+
+import unittest
+from qpid_dispatch_internal.management.amqp import Url
+
+class UrlTest(unittest.TestCase):
+
+ def test_url(self):
+ url = Url(scheme='amqp', user='me', password='secret', host='myhost',
port=1234, name='foobar')
+ self.assertEqual(str(url), "amqp://me:secret@myhost:1234/foobar")
+ self.assertEqual(
+ [url.scheme, url.user, url.password, url.host, url.port, url.name],
+ ['amqp', 'me', 'secret', 'myhost', 1234, 'foobar']
+ )
+
+ self.assertEqual(str(url), "amqp://me:secret@myhost:1234/foobar")
+ self.assertNotEqual(str(url), "amqps://me:secret@myhost:1234/foobar")
+ self.assertNotEqual(str(url), "me:secret@myhost:1234/foobar")
+ self.assertNotEqual(str(url), "amqp://notme:secret@myhost:1234/foobar")
+ self.assertNotEqual(str(url), "amqp://me:notsecret@myhost:1234/foobar")
+ self.assertNotEqual(str(url), "amqp://me:secret@notmyhost:1234/foobar")
+ self.assertNotEqual(str(url), "amqp://me:secret@myhost:1234/notfoobar")
+ self.assertNotEqual(str(url), "amqp://me:secret@myhost:5555/foobar")
+
+ # Check that we allow None for scheme, port
+ url = Url(user='me', password='secret', host='myhost', name='foobar')
+ self.assertEqual(str(url), "me:secret@myhost/foobar")
+ self.assertEqual(
+ [url.scheme, url.user, url.password, url.host, url.port, url.name],
+ [None, 'me', 'secret', 'myhost', None, 'foobar']
+ )
+
+ # Scheme defaults
+ self.assertEqual(str(Url("me:secret@myhost/foobar").defaults()),
+ "amqp://me:secret@myhost:5672/foobar")
+ # Correct port for amqps vs. amqps
+
self.assertEqual(str(Url("amqps://me:secret@myhost/foobar").defaults()),
+ "amqps://me:secret@myhost:5671/foobar")
+ self.assertEqual(str(Url("amqp://me:secret@myhost/foobar").defaults()),
+ "amqp://me:secret@myhost:5672/foobar")
+
+ # Empty string for name
+ self.assertEqual(Url("myhost/").name, "")
+ self.assertIsNone(Url("myhost").name)
Propchange: qpid/dispatch/trunk/tests/management/amqp.py
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/tests/system_test.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun 6 00:31:33 2014
@@ -173,7 +173,7 @@ def wait_port(port, host='0.0.0.0', **re
retry_exception(lambda: s.connect((host, port)), exception_test=check,
**retry_kwargs)
except Exception, e:
- raise Exception("wait_port timeout on %s:%s: %s"%(host, port, e))
+ raise Exception("wait_port timeout on host %s port %s: %s"%(host,
port, e))
finally: s.close()
@@ -187,6 +187,7 @@ def message(**properties):
"""Convenience to create a proton.Message with properties set"""
m = Message()
for name, value in properties.iteritems():
+ getattr(m, name) # Raise exception if not a valid message
attribute.
setattr(m, name, value)
return m
@@ -473,7 +474,7 @@ class Tester(object):
if a:
a()
break
- os.chdir(self.save_dir)
+ if self.save_dir: os.chdir(self.save_dir)
def cleanup(self, x):
"""Record object x for clean-up during tear-down.
@@ -553,30 +554,34 @@ class TestCase(unittest.TestCase, Tester
cls.tester.teardown()
def setUp(self):
+ # Hack to support setUpClass on older python.
+ # If the class has not already been set up, do it now.
+ if not hasattr(self.__class__, 'tester'):
+ self.setUpClass()
# self.id() is normally the fully qualified method name
Tester.setup(self, os.path.join(self.base_dir(),
self.id().split(".")[-1]))
def tearDown(self):
Tester.teardown(self)
+ # Hack to support tearDownClass on older versions of python.
+ if hasattr(self.__class__, '_tear_down_class'):
+ self.tearDownClass()
def skipTest(self, reason):
"""Workaround missing unittest.TestCase.skipTest in python 2.6.
The caller must return in order to end the test"""
if hasattr(unittest.TestCase, 'skipTest'):
- self.skipTest(reason)
+ unittest.TestCase.skipTest(self, reason)
else:
print "Skipping test", id(), reason
- # Hack to support setUpClass/tearDownClass on older versions of python.
+ # Hack to support tearDownClass on older versions of python.
# The default TestLoader sorts tests alphabetically so we insert
- # fake tests that will run first and last to call the class setup/teardown
functions.
+ # a fake tests that will run last to call tearDownClass.
if not hasattr(unittest.TestCase, 'setUpClass'):
- def test_0000_setup_class(self):
- """Fake test to call setUpClass"""
- self.setUpClass()
def test_zzzz_teardown_class(self):
"""Fake test to call tearDownClass"""
- self.tearDownClass()
+ self.__class__._tear_down_class = True
def assert_fair(self, seq):
avg = sum(seq)/len(seq)
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Fri Jun 6 00:31:33 2014
@@ -22,7 +22,7 @@ System tests involving one or more broke
with waypoints.
"""
import unittest, system_test
-from system_test import wait_port, wait_ports, Qdrouterd, retry, message,
MISSING_REQUIREMENTS
+from system_test import wait_port, Qdrouterd, message, MISSING_REQUIREMENTS
from itertools import cycle
class DistributedQueueTest(system_test.TestCase): # pylint:
disable=too-many-public-methods
Added: qpid/dispatch/trunk/tests/system_tests_management.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_management.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_management.py (added)
+++ qpid/dispatch/trunk/tests/system_tests_management.py Fri Jun 6 00:31:33
2014
@@ -0,0 +1,101 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""System tests for management of qdrouter"""
+
+import unittest, system_test, re
+from qpid_dispatch_internal.management import amqp
+from system_test import Qdrouterd, MISSING_REQUIREMENTS
+from httplib import BAD_REQUEST, NOT_IMPLEMENTED
+
+class ManagementTest(system_test.TestCase): # pylint:
disable=too-many-public-methods
+
+ @classmethod
+ def setUpClass(cls):
+ super(ManagementTest, cls).setUpClass()
+ name = cls.__name__
+ conf = Qdrouterd.Config([
+ ('log', {'module':'DEFAULT', 'level':'trace',
'output':name+".log"}),
+ ('router', {'mode': 'standalone', 'router-id': name}),
+ ('listener', {'port':cls.get_port(), 'role':'normal'})
+ ])
+ cls.router = cls.tester.qdrouterd('%s'%name, conf)
+ cls.router.wait_ready()
+
+ def setUp(self):
+ super(ManagementTest, self).setUp()
+ self.node = self.cleanup(amqp.Node(self.router.addresses[0]))
+
+ def assertRaisesManagement(self, status, pattern, call, *args, **kwargs):
+ """Assert that call(*args, **kwargs) raises a ManagementError
+ with status and matching pattern in description """
+ try:
+ call(*args, **kwargs)
+ self.fail("Expected ManagementError with %s, %s"%(status, pattern))
+ except amqp.ManagementError, e:
+ self.assertEqual(e.status, status)
+ assert re.search("(?i)"+pattern, e.description), "No match for %s
in %s"%(pattern, e.description)
+
+ def test_bad_query(self):
+ """Test that various badly formed queries get the proper response"""
+ self.assertRaisesManagement(
+ BAD_REQUEST, "No operation", self.node.call, self.node.request())
+ self.assertRaisesManagement(
+ NOT_IMPLEMENTED, "Not Implemented: nosuch",
+ self.node.call, self.node.request(operation="nosuch"))
+ self.assertRaisesManagement(
+ BAD_REQUEST, r'(entityType|attributeNames).*must be provided',
+ self.node.query)
+
+ def test_query_entity_type(self):
+ # FIXME aconway 2014-06-03: prefix support in Node, get from schema.
+ address = 'org.apache.qpid.dispatch.router.address'
+ response = self.node.query(entity_type=address)
+ self.assertEqual(response.attribute_names[0:3], ['type', 'name',
'identity'])
+ for r in response.results: # Check types
+ self.assertEqual(r[0], address)
+ names = [r[1] for r in response.results]
+ self.assertTrue('L$management' in names)
+ self.assertTrue('M0$management' in names)
+
+ # FIXME aconway 2014-06-05: negative test: offset, count not
implemented on router
+ try:
+ # Try offset, count
+ self.assertGreater(len(names), 2)
+ response0 = self.node.query(entity_type=address, count=1)
+ self.assertEqual(names[0:1], [r[1] for r in response0.results])
+ response1_2 = self.node.query(entity_type=address, count=2,
offset=1)
+ self.assertEqual(names[1:3], [r[1] for r in response1_2.results])
+ self.fail("Negative test passed!")
+ except: pass
+
+ def test_query_attribute_names(self):
+ response = self.node.query(attribute_names=["type", "name",
"identity"])
+ # FIXME aconway 2014-06-05: negative test: attribute_names query
doesn't work.
+ # Need a better test.
+ try:
+ self.assertNotEqual([], response.results)
+ self.fail("Negative test passed!")
+ except: pass
+
+if __name__ == '__main__':
+ if MISSING_REQUIREMENTS:
+ print MISSING_REQUIREMENTS
+ else:
+ unittest.main()
Propchange: qpid/dispatch/trunk/tests/system_tests_management.py
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Fri Jun 6 00:31:33
2014
@@ -17,13 +17,9 @@
# under the License.
#
-import sys
-import os
-import time
import unittest
-import subprocess
from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
-from system_test import TestCase, message, Messenger, Qdrouterd
+from system_test import TestCase, Messenger, Qdrouterd
class RouterTest(TestCase):
"""System tests involving a single router"""
Modified: qpid/dispatch/trunk/tools/qdtest.in
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Fri Jun 6 00:31:33 2014
@@ -36,5 +36,8 @@ python $QPID_DISPATCH_HOME/tests/system_
echo "Running system_tests_broker.py"
python $QPID_DISPATCH_HOME/tests/system_tests_broker.py -v
+echo "Running system_tests_management.py"
+python $QPID_DISPATCH_HOME/tests/system_tests_management.py -v
+
echo "Running qdstat_test.sh"
bash $QPID_DISPATCH_HOME/tests/qdstat_test.sh
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]