Author: aconway
Date: Fri Jun  6 00:31:33 2014
New Revision: 1600798

URL: http://svn.apache.org/r1600798
Log:
DISPATCH-56: Initial (query only) generic AMQP management client in python.

python/qpid_dispatch_internal.amqp.py: Generic client to an AMQP management 
node.
Currently supports QUERY operations only.

tests/system_tests_management.py: system tests for management client.

src/agent.c: Send error responses for more error conditions. Log management 
errors.

Python test fixes to work with python 2.6.

TODO: specific validating client for qdrouter nodes, using qdrouter schema.

Added:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py   
(with props)
    qpid/dispatch/trunk/tests/management/amqp.py   (with props)
    qpid/dispatch/trunk/tests/system_tests_management.py   (with props)
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
    qpid/dispatch/trunk/include/qpid/dispatch/error.h
    qpid/dispatch/trunk/src/agent.c
    qpid/dispatch/trunk/src/amqp.c
    qpid/dispatch/trunk/src/error.c
    qpid/dispatch/trunk/src/log.c
    qpid/dispatch/trunk/src/log_private.h
    qpid/dispatch/trunk/src/message.c
    qpid/dispatch/trunk/tests/CMakeLists.txt
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_broker.py
    qpid/dispatch/trunk/tests/system_tests_one_router.py
    qpid/dispatch/trunk/tools/qdtest.in

Modified: qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/amqp.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/amqp.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/amqp.h Fri Jun  6 00:31:33 2014
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,7 @@
  */
 
 /** @defgroup amqp AMQP
- * 
+ *
  * AMQP related constant definitions.
  */
 /// @{
@@ -105,7 +105,15 @@ const char * const QD_INTERNODE_LINK_NAM
 const char * const QD_INTERNODE_LINK_NAME_2;
 /// @}
 
+/** @name AMQP error codes. */
+/// @{
+typedef struct qd_amqp_error_t { int status; const char* description; } 
qd_amqp_error_t;
+extern const qd_amqp_error_t QD_AMQP_OK;
+extern const qd_amqp_error_t QD_AMQP_BAD_REQUEST;
+extern const qd_amqp_error_t QD_AMQP_NOT_FOUND;
+extern const qd_amqp_error_t QD_AMQP_NOT_IMPLEMENTED;
 /// @}
 
-#endif
+/// @}
 
+#endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/error.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/error.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/error.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/error.h Fri Jun  6 00:31:33 2014
@@ -67,4 +67,6 @@ const char* qd_error_message();
  */
 qd_error_t qd_error_code();
 
+/** Maximum length of a qd_error_message, useful for temporary buffers. */
+extern const int QD_ERROR_MAX;
 #endif

Added: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py (added)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py Fri 
Jun  6 00:31:33 2014
@@ -0,0 +1,260 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""
+AMQP management tools for Qpid dispatch.
+"""
+
+import proton, re, threading, httplib
+from collections import namedtuple
+
+class Error(Exception): pass
+
+class ManagementError(Exception):
+    """An AMQP management error. str() gives a string with status code and 
text.
+    @ivar status: integer status code
+    @ivar description: description text
+    """
+    def __init__(self, status, description):
+        self.status, self.description = status, description
+
+    def __str__(self):
+        status_str = httplib.responses.get(self.status)
+        if status_str in self.description: return self.description
+        else: return "%s: %s"%(self.status, self.description)
+
+    @staticmethod
+    def check(status, response):
+        if status != httplib.OK:
+            raise ManagementError(status, response)
+
+
+# FIXME aconway 2014-06-03: proton URL class, conditional import?
+class Url:
+    """Simple AMQP URL parser/constructor"""
+
+    RE = re.compile(r"""
+    # [   <scheme>://  ] [   <user>   [    : <password> ]  @]      ( <host4>   
| \[    <host6>    \] ) [   :<port>   ]   [ / path]
+    ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: : ([^:/@]+)  )? @)? (?: ([^@:/\[]+) 
| \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))? (?: / (.*))? $
+""", re.X | re.I)
+    AMQPS = "amqps"
+    AMQP = "amqp"
+
+    def __init__(self, s=None, **kwargs):
+        """
+        @param s: String value to convert to URL
+        @param kwargs: URL components: scheme, user, password, host, port, 
path.
+        """
+        if s is None:
+            self.scheme = kwargs.get('scheme')
+            self.user = kwargs.get('user')
+            self.password = kwargs.get('password')
+            self.host = kwargs.get('host')
+            self.port = kwargs.get('port')
+            if self.host is None:
+                raise ValueError('Host required for url')
+            self.path = kwargs.get('path')
+        elif isinstance(s, Url):
+            self.scheme = s.scheme
+            self.user = s.user
+            self.password = s.password
+            self.host = s.host
+            self.port = s.port
+            self.path = s.path
+        else:
+            match = Url.RE.match(s)
+            if match is None:
+                raise ValueError(s)
+            scheme, self.user, self.password, host4, host6, port, self.path = 
match.groups()
+            self.host = host4 or host6
+            self.port = port and int(port)
+            self.scheme = scheme
+
+    def __repr__(self):
+        return "Url(%r)" % str(self)
+
+    def __str__(self):
+        s = ""
+        if self.scheme:
+            s += "%s://" % self.scheme
+        if self.user:
+            s += self.user
+        if self.password:
+            s += ":%s@" % self.password
+        if ':' not in self.host:
+            s += self.host
+        else:
+            s += "[%s]" % self.host
+        if self.port:
+            s += ":%s" % self.port
+        if self.path:
+            s += "/%s" % self.path
+        return s
+
+    def __eq__(self, url):
+         if isinstance(url, basestring):
+             url = Url(url)
+         return \
+             self.scheme==url.scheme and \
+             self.user==url.user and self.password==url.password and \
+             self.host==url.host and self.port==url.port and \
+             self.path==url.path
+
+    def __ne__(self, url):
+         return not self.__eq__(url)
+
+    def defaults(self):
+        """"Fill in defaults for scheme and port if missing """
+        if not self.scheme: self.scheme = self.AMQP
+        if not self.port:
+            if self.scheme==self.AMQP: self.port = 5672
+            elif self.scheme==self.AMQPS: self.port = 5671
+            else: raise ValueError("Invalid URL scheme: %s"%self.scheme)
+        return self
+
+def remove_none(d):
+    """
+    Remove any None values from a dictionary. Does not modify d.
+    """
+    return dict((k, v) for k, v in d.iteritems() if v is not None)
+
+class Node(object):
+
+    SELF='self'                 # AMQP management node name
+    NODE_TYPE='org.amqp.management' # AMQP management node type
+    NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
+
+    def __init__(self, address, router=None, locales=None):
+        """
+        @param address: AMQP address of the management node.
+        @param router: If address does not contain a path, use the management 
node for this router ID.
+            If not specified and address does not contain a path, use the 
default management node.
+        @param locales: Default list of locales for management operations.
+        """
+        self.address = Url(address).defaults()
+        self.locales = locales
+        if self.address.path is None:
+            if router:
+                self.address.path = '_topo/0/%s/$management' % router
+            else:
+                self.address.path = '$management'
+        self.responses = {}
+
+        self.messenger = proton.Messenger()
+        self.messenger.start()
+        self.messenger.timeout = 1 # FIXME aconway 2014-06-02: config
+        subscribe_address = Url(address)
+        subscribe_address.path = "#"
+        self.subscription = self.messenger.subscribe(str(subscribe_address))
+        self._flush()
+        self.reply_to = self.subscription.address
+
+    def stop(self):
+        if not self.messenger: return
+        self.messenger.stop()
+        self.messenger = None
+
+    def __del__(self):
+        self.stop()
+
+    def _flush(self):
+        """Call self.messenger.work() till there is no work left."""
+        while self.messenger.work(0.01):
+            pass
+
+    CORRELATION_ID = 0
+    CORRELATION_LOCK = threading.Lock
+
+    def correlation_id(self):
+        """Get the next correlation ID. Thread safe."""
+        self.CORRELATION_ID += 1
+        return self.CORRELATION_ID
+
+    def check_response(self, response, correlation_id=None):
+        """
+        Check a manaement response message for errors and correlation ID.
+        """
+        properties = response.properties
+        ManagementError.check(properties.get('statusCode'), 
properties.get('statusDescription'))
+        if correlation_id is not None and response.correlation_id != 
correlation_id:
+            raise ManagementError("Bad correlation id request=%s, 
response=%s"%(
+                correlation_id, response.correlation_id))
+
+
+    def request(self, body=None, **properties):
+        """
+        Make a L{proton.Message} containining a management request.
+        @param body: The request body, a dict or list.
+        @param properties: Map of application-properties for the request.
+        @return: L{proton.Message} containining the management request.
+        """
+        if self.locales: properties.set_default(self.locales)
+        request = proton.Message()
+        request.address=str(self.address)
+        request.reply_to=self.reply_to
+        request.correlation_id=self.correlation_id()
+        request.properties=remove_none(properties)
+        request.body=remove_none(body or {})
+        return request
+
+    def node_request(self, body={}, **properties):
+        return self.request(body, name=self.SELF, type=self.NODE_TYPE, 
**properties)
+
+    # TODO aconway 2014-06-03: async send/receive
+    def call(self, request):
+        """
+        Send a management request message, wait for a response.
+        @return: Response message
+        """
+        if not request.address:
+            raise ValueError("Message must have an address")
+        if not request.reply_to:
+            raise ValueError("Message must have reply_to")
+        self.messenger.put(request)
+        self.messenger.send()
+        self._flush()
+        self.messenger.recv(1)
+        response = proton.Message()
+        self.messenger.get(response)
+        self.check_response(response)
+        return response
+
+    class QueryResult(namedtuple('QueryResult', ['attribute_names', 
'results'])):
+        """
+        Result returned by L{query}
+        @ivar attribute_names: List of attribute names for the results.
+        @ivar results: List of lists. Each entry is a list of attribute values
+            corresponding to the attribute_names.
+        """
+        pass
+
+    def query(self, entity_type=None, attribute_names=None, offset=None, 
count=None):
+        """
+        Send an AMQP management query message and return the response.
+        At least one of entity_type, attribute_names must be specified.
+        @keyword entity_type: The type of entity to query.
+        @keyword attribute_names: A list of attribute names to query.
+        @keyword offset: An integer offset into the list of results to return.
+        @keyword count: A count of the maximum number of results to return.
+        @return: A L{QueryResult}
+        """
+        response = self.call(self.node_request(
+            operation='QUERY', entityType=entity_type, offset=offset, 
count=count,
+            body={'attributeNames':attribute_names or []}))
+        return Node.QueryResult(response.body['attributeNames'], 
response.body['results'])

Propchange: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/src/agent.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Fri Jun  6 00:31:33 2014
@@ -85,7 +85,6 @@ typedef enum {
 // Convenience for logging, expects agent to be defined.
 #define LOG(LEVEL, MSG, ...) qd_log(agent->log_source, QD_LOG_##LEVEL, MSG, 
##__VA_ARGS__)
 
-
 static const char *AGENT_ADDRESS       = "$management";
 static const char *STATUS_CODE         = "statusCode";
 static const char *STATUS_DESCRIPTION  = "statusDescription";
@@ -104,12 +103,10 @@ static const char *OP_GET_MGMT_NODES   =
 static const char *BODY_ATTR_NAMES     = "attributeNames";
 static const char *BODY_RESULTS        = "results";
 
-static const char *BAD_REQUEST               = "Bad Request";
-static const char *BAD_REQUEST_NEED_BODY_MAP = "Bad Request - Expected map in 
body of the request";
-static const char *BAD_REQUEST_NEED_ALIST    = "Bad Request - Expected 
attributeList in body map";
-static const char *BAD_REQUEST_ALL_DEFAULT   = "Bad Request - At least one of 
entityType or attributeNames must be provided";
-static const char *NOT_IMPLEMENTED           = "Not Implemented";
-static const char *NOT_FOUND                 = "Not Found";
+static const char *INVALID_ENTITY_TYPE="Invalid entityType";
+static const char *NEED_BODY_MAP = "Expected map in body of the request";
+static const char *NEED_ALIST    = "Expected attributeNames in body map";
+static const char *ALL_DEFAULT   = "At least one of entityType or 
attributeNames must be provided";
 
 #define ATTR_ABSENT 1000000
 #define ATTR_TYPE   1000001
@@ -141,10 +138,10 @@ static qd_composed_field_t *qd_agent_set
     field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
     qd_compose_start_map(field);
     qd_compose_insert_string(field, STATUS_CODE);
-    qd_compose_insert_uint(field, 200);
+    qd_compose_insert_uint(field, QD_AMQP_OK.status);
 
     qd_compose_insert_string(field, STATUS_DESCRIPTION);
-    qd_compose_insert_string(field, "OK");
+    qd_compose_insert_string(field, QD_AMQP_OK.description);
 
     if (close_ap)
         qd_compose_end_map(field);
@@ -153,7 +150,9 @@ static qd_composed_field_t *qd_agent_set
 }
 
 
-static qd_composed_field_t *qd_agent_setup_error(qd_field_iterator_t 
*reply_to, qd_field_iterator_t *cid, uint32_t code, const char *text)
+static qd_composed_field_t *qd_agent_setup_error(
+    qd_agent_t* agent, qd_field_iterator_t *reply_to, qd_field_iterator_t *cid,
+    qd_amqp_error_t error, const char *text)
 {
     qd_composed_field_t *field = 0;
 
@@ -179,10 +178,13 @@ static qd_composed_field_t *qd_agent_set
     field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
     qd_compose_start_map(field);
     qd_compose_insert_string(field, STATUS_CODE);
-    qd_compose_insert_uint(field, code);
+    qd_compose_insert_uint(field, error.status);
 
     qd_compose_insert_string(field, STATUS_DESCRIPTION);
-    qd_compose_insert_string(field, text);
+    char msg[QD_ERROR_MAX];
+    snprintf(msg, sizeof(msg), "%s: %s", error.description, text);
+    LOG(ERROR, "Management error: %s", msg);
+    qd_compose_insert_string(field, msg);
     qd_compose_end_map(field);
 
     return field;
@@ -205,6 +207,13 @@ static void qd_agent_send_response(qd_ag
         qd_compose_free(field2);
 }
 
+static void qd_agent_send_error(
+    qd_agent_t *agent, qd_field_iterator_t *reply_to, qd_field_iterator_t *cid,
+    qd_amqp_error_t code, const char *text)
+{
+    qd_agent_send_response(agent, qd_agent_setup_error(agent, reply_to, cid, 
code, text),
+                          0, reply_to);
+}
 
 static void qd_agent_insert_attr_names(qd_composed_field_t    *field,
                                        const qd_agent_class_t *cls,
@@ -345,7 +354,7 @@ static void qd_agent_process_object_quer
         // The body of the request must be present
         //
         if (!body) {
-            qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 
400, BAD_REQUEST_NEED_BODY_MAP), 0, reply_to);
+            qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
NEED_BODY_MAP);
             break;
         }
 
@@ -354,7 +363,7 @@ static void qd_agent_process_object_quer
         //
         body_map = qd_parse(body);
         if (!body_map || !qd_parse_ok(body_map) || !qd_parse_is_map(body_map)) 
{
-            qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 
400, BAD_REQUEST_NEED_BODY_MAP), 0, reply_to);
+            qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
NEED_BODY_MAP);
             break;
         }
 
@@ -364,7 +373,7 @@ static void qd_agent_process_object_quer
         //
         attr_list = qd_parse_value_by_key(body_map, BODY_ATTR_NAMES);
         if (!attr_list || !qd_parse_ok(attr_list) || 
!qd_parse_is_list(attr_list)) {
-            qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 
400, BAD_REQUEST_NEED_ALIST), 0, reply_to);
+            qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
NEED_ALIST);
             break;
         }
 
@@ -375,7 +384,7 @@ static void qd_agent_process_object_quer
         // specified in the request.
         //
         if (qd_parse_sub_count(attr_list) == 0 && !etype_field) {
-            qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 
400, BAD_REQUEST_ALL_DEFAULT), 0, reply_to);
+            qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
ALL_DEFAULT);
             break;
         }
 
@@ -392,7 +401,9 @@ static void qd_agent_process_object_quer
             // If the entityType was specified but not found, return an error.
             //
             if (cls_record == 0) {
-                qd_agent_send_response(agent, qd_agent_setup_error(reply_to, 
cid, 404, NOT_FOUND), 0, reply_to);
+               char entity[QD_ERROR_MAX];
+               qd_field_iterator_ncopy(cls_string, (unsigned char*)entity, 
sizeof(entity));
+                qd_agent_send_error(agent, reply_to, cid, QD_AMQP_NOT_FOUND, 
entity);
                 break;
             }
         }
@@ -504,7 +515,7 @@ static void qd_agent_process_agent_query
     qd_composed_field_t *field = 0;
 
     if (etype && !qd_parse_is_scalar(etype))
-        field = qd_agent_setup_error(reply_to, cid, 400, BAD_REQUEST);
+        field = qd_agent_setup_error(agent, reply_to, cid, 
QD_AMQP_BAD_REQUEST, INVALID_ENTITY_TYPE);
     else {
         field = qd_agent_setup_response(reply_to, cid, true);
 
@@ -596,35 +607,41 @@ static void qd_agent_process_request(qd_
     // Parse the message through the body and exit if the message is not well 
formed.
     //
     if (!qd_message_check(msg, QD_DEPTH_BODY)) {
-       LOG(ERROR, "Bad request: %s", qd_error_message());
+       LOG(ERROR, "Cannot parse request: %s", qd_error_message());
        return;
     }
 
     //
+    // Get an iterator for the reply-to.  Exit if not found.
+    //
+    qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, 
QD_FIELD_REPLY_TO);
+    if (reply_to == 0) {
+       LOG(ERROR, "Reqeust has no reply-to");
+        return;
+    }
+
+    //
+    // Get an iterator for the correlation_id.
+    //
+    qd_field_iterator_t *cid = qd_message_field_iterator_typed(msg, 
QD_FIELD_CORRELATION_ID);
+
+    //
     // Get an iterator for the application-properties.  Exit if the message 
has none.
     //
     qd_field_iterator_t *ap = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
     if (ap == 0) {
-       LOG(ERROR, "Bad request: no application-properties");
+       qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, "No 
application-properties");
        return;
     }
 
     //
-    // Get an iterator for the reply-to.  Exit if not found.
-    //
-    qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, 
QD_FIELD_REPLY_TO);
-    if (reply_to == 0) {
-       LOG(ERROR, "Bad request: no reply-to");
-        return;
-    }
-    //
     // Try to get a map-view of the application-properties.
     //
     qd_parsed_field_t *map = qd_parse(ap);
     if (map == 0) {
         qd_field_iterator_free(ap);
         qd_field_iterator_free(reply_to);
-       LOG(ERROR, "Bad request: application-properties not a map");
+       qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
"Application-properties not a map");
         return;
     }
 
@@ -635,7 +652,7 @@ static void qd_agent_process_request(qd_
         qd_field_iterator_free(ap);
         qd_field_iterator_free(reply_to);
         qd_parse_free(map);
-       LOG(ERROR, "Bad request: application-properties not a map");
+       qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, 
"Application-properties not a map");
         return;
     }
 
@@ -647,15 +664,11 @@ static void qd_agent_process_request(qd_
         qd_parse_free(map);
         qd_field_iterator_free(ap);
         qd_field_iterator_free(reply_to);
+       qd_agent_send_error(agent, reply_to, cid, QD_AMQP_BAD_REQUEST, "No 
operation");
         return;
     }
 
     //
-    // Get an iterator for the correlation_id.
-    //
-    qd_field_iterator_t *cid = qd_message_field_iterator_typed(msg, 
QD_FIELD_CORRELATION_ID);
-
-    //
     // Dispatch the operation to the appropriate handler
     //
     qd_field_iterator_t *operation_string = qd_parse_raw(operation);
@@ -673,8 +686,12 @@ static void qd_agent_process_request(qd_
         qd_agent_process_agent_query(agent, map, reply_to, cid, 
QD_DISCOVER_OPERATIONS);
     else if (qd_field_iterator_equal(operation_string, (unsigned char*) 
OP_GET_MGMT_NODES))
         qd_agent_process_discover_nodes(agent, map, reply_to, cid);
-    else
-        qd_agent_send_response(agent, qd_agent_setup_error(reply_to, cid, 501, 
NOT_IMPLEMENTED), 0, reply_to);
+    else {
+       char op[QD_ERROR_MAX];
+       int i = qd_field_iterator_ncopy(operation_string, (unsigned char*)op, 
sizeof(op)-1);
+       op[i] = '\0';
+        qd_agent_send_error(agent, reply_to, cid, QD_AMQP_NOT_IMPLEMENTED, op);
+    }
 
     qd_parse_free(map);
     qd_field_iterator_free(ap);

Modified: qpid/dispatch/trunk/src/amqp.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/amqp.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/amqp.c (original)
+++ qpid/dispatch/trunk/src/amqp.c Fri Jun  6 00:31:33 2014
@@ -29,3 +29,7 @@ const char * const QD_CAPABILITY_ROUTER 
 const char * const QD_INTERNODE_LINK_NAME_1 = "qd.internode.1";
 const char * const QD_INTERNODE_LINK_NAME_2 = "qd.internode.2";
 
+const qd_amqp_error_t QD_AMQP_OK = { 200, "OK" };
+const qd_amqp_error_t QD_AMQP_BAD_REQUEST = { 400, "Bad Request" };
+const qd_amqp_error_t QD_AMQP_NOT_FOUND = { 404, "Not Found" };
+const qd_amqp_error_t QD_AMQP_NOT_IMPLEMENTED = { 501, "Not Implemented"};

Modified: qpid/dispatch/trunk/src/error.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/error.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/error.c (original)
+++ qpid/dispatch/trunk/src/error.c Fri Jun  6 00:31:33 2014
@@ -22,6 +22,7 @@
 #include <stdarg.h>
 #include <stdio.h>
 #include "static_assert.h"
+#include "log_private.h"
 
 static const char *error_names[] = {
  "No Error",
@@ -33,8 +34,8 @@ static const char *error_names[] = {
 
 STATIC_ASSERT(sizeof(error_names)/sizeof(error_names[0]) == QD_ERROR_COUNT, 
error_names_wrong_size);
 
-#define ERROR_MAX 512
-
+#define ERROR_MAX QD_LOG_TEXT_MAX
+const int QD_ERROR_MAX = ERROR_MAX;
 static __thread char error_message[ERROR_MAX];
 static __thread qd_error_t error_code = 0;
 static qd_log_source_t* log_source = 0;

Modified: qpid/dispatch/trunk/src/log.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log.c (original)
+++ qpid/dispatch/trunk/src/log.c Fri Jun  6 00:31:33 2014
@@ -30,7 +30,7 @@
 #include <time.h>
 #include <syslog.h>
 
-#define TEXT_MAX 512
+#define TEXT_MAX QD_LOG_TEXT_MAX
 #define LIST_MAX 1000
 #define LOG_MAX 640
 
@@ -89,7 +89,7 @@ static log_sink_t* find_log_sink_lh(cons
 
 static log_sink_t* log_sink_lh(const char* name) {
     log_sink_t* sink = find_log_sink_lh(name);
-    if (sink) 
+    if (sink)
        sink->refcount++;
     else {
        sink = NEW(log_sink_t);
@@ -362,4 +362,3 @@ void qd_log_configure(const qd_dispatch_
     }
     qd_log(logging_log_source, QD_LOG_INFO, "Logging system configured");
 }
-

Modified: qpid/dispatch/trunk/src/log_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log_private.h?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log_private.h (original)
+++ qpid/dispatch/trunk/src/log_private.h Fri Jun  6 00:31:33 2014
@@ -23,5 +23,5 @@
 
 void qd_log_initialize(void);
 void qd_log_finalize(void);
-
+#define QD_LOG_TEXT_MAX 512
 #endif

Modified: qpid/dispatch/trunk/src/message.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Fri Jun  6 00:31:33 2014
@@ -101,12 +101,11 @@ static const char REPR_END[] = "}\0";
 
 /* TODO aconway 2014-05-13: more detailed message representation. */
 char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len) {
+    qd_message_check(msg, QD_DEPTH_BODY);
     int i = 0;
     len -= sizeof(REPR_END);   /* Save space for ending */
     i += snprintf(buffer+i, len-i, "Message(%p){", msg);
-    if (!qd_message_check(msg, QD_DEPTH_BODY))
-       i += snprintf(buffer+i, len-i, "<%s>", qd_error_message());
-    i += copy_field(msg, QD_FIELD_TO, INT_MAX, " to='", "'", buffer+i, len-i);
+    i += copy_field(msg, QD_FIELD_TO, INT_MAX, "to='", "'", buffer+i, len-i);
     i += copy_field(msg, QD_FIELD_REPLY_TO, INT_MAX, " reply-to='", "'", 
buffer+i, len-i);
     i += copy_field(msg, QD_FIELD_BODY, 16, " body='", "'", buffer+i, len-i);
     assert(i <= len);

Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Fri Jun  6 00:31:33 2014
@@ -54,7 +54,7 @@ add_test(unit_tests            unit_test
 add_test(router_tests          ${PYTHON_EXECUTABLE} 
${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v)
 add_test(management_tests      ${PYTHON_EXECUTABLE} 
${CMAKE_BINARY_DIR}/build_env.py ${PYTHON_EXECUTABLE} -m unittest -v management)
 
-set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py 
system_tests_two_routers.py system_tests_broker.py)
+set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py 
system_tests_two_routers.py system_tests_broker.py system_tests_management.py)
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_build.sh.in 
${CMAKE_CURRENT_BINARY_DIR}/config_build.sh)
 

Added: qpid/dispatch/trunk/tests/management/amqp.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/management/amqp.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/management/amqp.py (added)
+++ qpid/dispatch/trunk/tests/management/amqp.py Fri Jun  6 00:31:33 2014
@@ -0,0 +1,63 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""Tests for management.amqp"""
+
+import unittest
+from qpid_dispatch_internal.management.amqp import Url
+
+class UrlTest(unittest.TestCase):
+
+    def test_url(self):
+        url = Url(scheme='amqp', user='me', password='secret', host='myhost', 
port=1234, name='foobar')
+        self.assertEqual(str(url), "amqp://me:secret@myhost:1234/foobar")
+        self.assertEqual(
+            [url.scheme, url.user, url.password, url.host, url.port, url.name],
+            ['amqp', 'me', 'secret', 'myhost', 1234, 'foobar']
+        )
+
+        self.assertEqual(str(url), "amqp://me:secret@myhost:1234/foobar")
+        self.assertNotEqual(str(url), "amqps://me:secret@myhost:1234/foobar")
+        self.assertNotEqual(str(url), "me:secret@myhost:1234/foobar")
+        self.assertNotEqual(str(url), "amqp://notme:secret@myhost:1234/foobar")
+        self.assertNotEqual(str(url), "amqp://me:notsecret@myhost:1234/foobar")
+        self.assertNotEqual(str(url), "amqp://me:secret@notmyhost:1234/foobar")
+        self.assertNotEqual(str(url), "amqp://me:secret@myhost:1234/notfoobar")
+        self.assertNotEqual(str(url), "amqp://me:secret@myhost:5555/foobar")
+
+        # Check that we allow None for scheme, port
+        url = Url(user='me', password='secret', host='myhost', name='foobar')
+        self.assertEqual(str(url), "me:secret@myhost/foobar")
+        self.assertEqual(
+            [url.scheme, url.user, url.password, url.host, url.port, url.name],
+            [None, 'me', 'secret', 'myhost', None, 'foobar']
+        )
+
+        # Scheme defaults
+        self.assertEqual(str(Url("me:secret@myhost/foobar").defaults()),
+                         "amqp://me:secret@myhost:5672/foobar")
+        # Correct port for amqps vs. amqps
+        
self.assertEqual(str(Url("amqps://me:secret@myhost/foobar").defaults()),
+                         "amqps://me:secret@myhost:5671/foobar")
+        self.assertEqual(str(Url("amqp://me:secret@myhost/foobar").defaults()),
+                         "amqp://me:secret@myhost:5672/foobar")
+
+        # Empty string for name
+        self.assertEqual(Url("myhost/").name, "")
+        self.assertIsNone(Url("myhost").name)

Propchange: qpid/dispatch/trunk/tests/management/amqp.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun  6 00:31:33 2014
@@ -173,7 +173,7 @@ def wait_port(port, host='0.0.0.0', **re
         retry_exception(lambda: s.connect((host, port)), exception_test=check,
                         **retry_kwargs)
     except Exception, e:
-        raise Exception("wait_port timeout on %s:%s: %s"%(host, port, e))
+        raise Exception("wait_port timeout on host %s port %s: %s"%(host, 
port, e))
 
     finally: s.close()
 
@@ -187,6 +187,7 @@ def message(**properties):
     """Convenience to create a proton.Message with properties set"""
     m = Message()
     for name, value in properties.iteritems():
+        getattr(m, name)        # Raise exception if not a valid message 
attribute.
         setattr(m, name, value)
     return m
 
@@ -473,7 +474,7 @@ class Tester(object):
                 if a:
                     a()
                     break
-        os.chdir(self.save_dir)
+        if self.save_dir: os.chdir(self.save_dir)
 
     def cleanup(self, x):
         """Record object x for clean-up during tear-down.
@@ -553,30 +554,34 @@ class TestCase(unittest.TestCase, Tester
             cls.tester.teardown()
 
     def setUp(self):
+        # Hack to support setUpClass on older python.
+        # If the class has not already been set up, do it now.
+        if not hasattr(self.__class__, 'tester'):
+            self.setUpClass()
         # self.id() is normally the fully qualified method name
         Tester.setup(self, os.path.join(self.base_dir(), 
self.id().split(".")[-1]))
 
     def tearDown(self):
         Tester.teardown(self)
+        # Hack to support tearDownClass on older versions of python.
+        if hasattr(self.__class__, '_tear_down_class'):
+            self.tearDownClass()
 
     def skipTest(self, reason):
         """Workaround missing unittest.TestCase.skipTest in python 2.6.
         The caller must return in order to end the test"""
         if hasattr(unittest.TestCase, 'skipTest'):
-            self.skipTest(reason)
+            unittest.TestCase.skipTest(self, reason)
         else:
             print "Skipping test", id(), reason
 
-    # Hack to support setUpClass/tearDownClass on older versions of python.
+    # Hack to support tearDownClass on older versions of python.
     # The default TestLoader sorts tests alphabetically so we insert
-    # fake tests that will run first and last to call the class setup/teardown 
functions.
+    # a fake tests that will run last to call tearDownClass.
     if not hasattr(unittest.TestCase, 'setUpClass'):
-        def test_0000_setup_class(self):
-            """Fake test to call setUpClass"""
-            self.setUpClass()
         def test_zzzz_teardown_class(self):
             """Fake test to call tearDownClass"""
-            self.tearDownClass()
+            self.__class__._tear_down_class = True
 
     def assert_fair(self, seq):
         avg = sum(seq)/len(seq)

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Fri Jun  6 00:31:33 2014
@@ -22,7 +22,7 @@ System tests involving one or more broke
 with waypoints.
 """
 import unittest, system_test
-from system_test import wait_port, wait_ports, Qdrouterd, retry, message, 
MISSING_REQUIREMENTS
+from system_test import wait_port, Qdrouterd, message, MISSING_REQUIREMENTS
 from itertools import cycle
 
 class DistributedQueueTest(system_test.TestCase): # pylint: 
disable=too-many-public-methods

Added: qpid/dispatch/trunk/tests/system_tests_management.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_management.py?rev=1600798&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_management.py (added)
+++ qpid/dispatch/trunk/tests/system_tests_management.py Fri Jun  6 00:31:33 
2014
@@ -0,0 +1,101 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied.  See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+"""System tests for management of qdrouter"""
+
+import unittest, system_test, re
+from qpid_dispatch_internal.management import amqp
+from system_test import Qdrouterd, MISSING_REQUIREMENTS
+from httplib import BAD_REQUEST, NOT_IMPLEMENTED
+
+class ManagementTest(system_test.TestCase): # pylint: 
disable=too-many-public-methods
+
+    @classmethod
+    def setUpClass(cls):
+        super(ManagementTest, cls).setUpClass()
+        name = cls.__name__
+        conf = Qdrouterd.Config([
+            ('log', {'module':'DEFAULT', 'level':'trace', 
'output':name+".log"}),
+            ('router', {'mode': 'standalone', 'router-id': name}),
+            ('listener', {'port':cls.get_port(), 'role':'normal'})
+        ])
+        cls.router = cls.tester.qdrouterd('%s'%name, conf)
+        cls.router.wait_ready()
+
+    def setUp(self):
+        super(ManagementTest, self).setUp()
+        self.node = self.cleanup(amqp.Node(self.router.addresses[0]))
+
+    def assertRaisesManagement(self, status, pattern, call, *args, **kwargs):
+        """Assert that call(*args, **kwargs) raises a ManagementError
+        with status and matching pattern in description """
+        try:
+            call(*args, **kwargs)
+            self.fail("Expected ManagementError with %s, %s"%(status, pattern))
+        except amqp.ManagementError, e:
+            self.assertEqual(e.status, status)
+            assert re.search("(?i)"+pattern, e.description), "No match for %s 
in %s"%(pattern, e.description)
+
+    def test_bad_query(self):
+        """Test that various badly formed queries get the proper response"""
+        self.assertRaisesManagement(
+            BAD_REQUEST, "No operation", self.node.call, self.node.request())
+        self.assertRaisesManagement(
+            NOT_IMPLEMENTED, "Not Implemented: nosuch",
+            self.node.call, self.node.request(operation="nosuch"))
+        self.assertRaisesManagement(
+            BAD_REQUEST, r'(entityType|attributeNames).*must be provided',
+            self.node.query)
+
+    def test_query_entity_type(self):
+        # FIXME aconway 2014-06-03: prefix support in Node, get from schema.
+        address = 'org.apache.qpid.dispatch.router.address'
+        response = self.node.query(entity_type=address)
+        self.assertEqual(response.attribute_names[0:3], ['type', 'name', 
'identity'])
+        for r in response.results:  # Check types
+            self.assertEqual(r[0], address)
+        names = [r[1] for r in response.results]
+        self.assertTrue('L$management' in names)
+        self.assertTrue('M0$management' in names)
+
+        # FIXME aconway 2014-06-05: negative test: offset, count not 
implemented on router
+        try:
+            # Try offset, count
+            self.assertGreater(len(names), 2)
+            response0 = self.node.query(entity_type=address, count=1)
+            self.assertEqual(names[0:1], [r[1] for r in response0.results])
+            response1_2 = self.node.query(entity_type=address, count=2, 
offset=1)
+            self.assertEqual(names[1:3], [r[1] for r in response1_2.results])
+            self.fail("Negative test passed!")
+        except: pass
+
+    def test_query_attribute_names(self):
+        response = self.node.query(attribute_names=["type", "name", 
"identity"])
+        # FIXME aconway 2014-06-05: negative test: attribute_names query 
doesn't work.
+        # Need a better test.
+        try:
+            self.assertNotEqual([], response.results)
+            self.fail("Negative test passed!")
+        except: pass
+
+if __name__ == '__main__':
+    if MISSING_REQUIREMENTS:
+        print MISSING_REQUIREMENTS
+    else:
+        unittest.main()

Propchange: qpid/dispatch/trunk/tests/system_tests_management.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Fri Jun  6 00:31:33 
2014
@@ -17,13 +17,9 @@
 # under the License.
 #
 
-import sys
-import os
-import time
 import unittest
-import subprocess
 from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
-from system_test import TestCase, message, Messenger, Qdrouterd
+from system_test import TestCase, Messenger, Qdrouterd
 
 class RouterTest(TestCase):
     """System tests involving a single router"""

Modified: qpid/dispatch/trunk/tools/qdtest.in
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1600798&r1=1600797&r2=1600798&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Fri Jun  6 00:31:33 2014
@@ -36,5 +36,8 @@ python $QPID_DISPATCH_HOME/tests/system_
 echo "Running system_tests_broker.py"
 python $QPID_DISPATCH_HOME/tests/system_tests_broker.py -v
 
+echo "Running system_tests_management.py"
+python $QPID_DISPATCH_HOME/tests/system_tests_management.py -v
+
 echo "Running qdstat_test.sh"
 bash $QPID_DISPATCH_HOME/tests/qdstat_test.sh



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to