This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new a754e18  DISPATCH-975: Enforce max message size on message ingress
a754e18 is described below

commit a754e1897ece244c97b1d76163e7b96902316627
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Wed Apr 15 10:33:25 2020 -0400

    DISPATCH-975: Enforce max message size on message ingress
    
    MaxMessageSize may be specified globally, per vhost, or per vhost user
    group. The global setting applies to all vhosts for which maxMessageSize
    is unspecified. The vhost setting applies to all vhost user groups for
    which maxMessageSize is unspecified. The vhost user group setting
    overrides all other settings. A maxMessageSize setting of zero disables
    maxMessageSize enforcement.
    
    Links over which maxMessageSize is being enforced will advertise the
    size in the max-message-size field of the Attach frame.
    
    Qpid-dispatch ignores the max-message-size field received in
    incoming Attach frames.
    
    Message size for maxMessageSize purposes is calculated to be the
    number of AMQP octets in the Annotated Message. This includes the
    header, delivery-annotations, message-annotations, properties,
    application-properties, application-data, and footer
    sections. Administrators and users must be aware that a "message"
    consisting a single character string (the application-data) will be
    much larger over the wire after properties and annotations have been
    inserted.
    
    Max message size is enforced on message/transfer ingress only. Interior
    routers form a trusted domain where messages that have entered by one
    interior router are allowed to any destination in the system. Edge
    routers are not trusted; messages from an edge router must be checked
    again by the attached interior router before they are forwarded to the
    trusted network. Messages from an interior router to an edge router
    is not checked again by the edge router.
    
     * Transfer size is checked as a message enters an interior router.
    
      A message that passes an interior router size check is free to go to any
      destination on that router or any other interior or edge router.
    
     * Transfers size is checked as a message enters an edge router.
    
      A message that passes an edge router size check is free to go to any
      destination on that edge router. If the message is uplinked to an
      interior router then that interior router may apply another max message
      size check possibly using a different maximum message size setting.
    
    When a message exceeds max size then:
    
     * Disposition of rejected is returned to the sender for that delivery.
     * Copies of the message being delivered through the router network are 
aborted.
     * The connection to the sender is closed with:
    
           condition:   "amqp:connection:forced"
           description: "Message size exceeded"
    
    Self test includes a four-router linear network with two interior and two
    edge routers. Tests try oversize and undersize messages with a variety of
    sender and receiver attachment points in that network. The test routers
    are configured with varying max message sizes.
    
    TODO:
    
     * No tests yet for link route.
     * No doc book pages describing feature configuration and behavior.
---
 include/qpid/dispatch/container.h                  |    2 +
 include/qpid/dispatch/message.h                    |    6 +
 include/qpid/dispatch/server.h                     |    2 +
 python/qpid_dispatch/management/qdrouter.json      |   23 +-
 python/qpid_dispatch_internal/management/config.py |    2 +
 .../qpid_dispatch_internal/policy/policy_local.py  |   37 +-
 .../policy/policy_manager.py                       |    8 +
 src/container.c                                    |    7 +-
 src/message.c                                      |   41 +-
 src/message_private.h                              |    3 +
 src/policy.c                                       |   48 +-
 src/policy.h                                       |   16 +
 src/router_core/delivery.c                         |    6 +
 src/router_core/delivery.h                         |    2 +
 src/router_core/transfer.c                         |    2 +-
 src/router_node.c                                  |  114 ++-
 src/server.c                                       |    4 +
 tests/CMakeLists.txt                               |    2 +
 tests/system_tests_policy_oversize_basic.py        |  792 +++++++++++++++
 tests/system_tests_policy_oversize_compound.py     | 1058 ++++++++++++++++++++
 20 files changed, 2111 insertions(+), 64 deletions(-)

diff --git a/include/qpid/dispatch/container.h 
b/include/qpid/dispatch/container.h
index 99eb3bd..054c6d5 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -238,6 +238,8 @@ void qd_session_free(qd_session_t *qd_ssn);
 bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
 qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);
 
+void qd_connection_log_policy_denial(qd_link_t *link, const char *text);
+
 
 // handy macros to get around PROTON-2184: pn_session_set_context aborts if
 // context==0  (can remove this once qdrouter requires >= proton 0.31.x)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 9ed2bc2..afc39c1 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -432,6 +432,12 @@ void qd_message_set_aborted(const qd_message_t *msg, bool 
aborted);
  */
 uint8_t qd_message_get_priority(qd_message_t *msg);
 
+/**
+ * True if message is larger that maxMessageSize
+ * @param msg A pointer to the message
+ * @return 
+ */
+bool qd_message_oversize(const qd_message_t *msg);
 
 ///@}
 
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 043baa5..abe5a9a 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -604,6 +604,8 @@ bool qd_connection_strip_annotations_in(const 
qd_connection_t *c);
 
 void qd_connection_wake(qd_connection_t *ctx);
 
+int qd_connection_max_message_size(const qd_connection_t *c);
+
 /**
  * @}
  */
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 9254b7d..ec678d0 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1869,6 +1869,13 @@
                     "required": false,
                     "create": true
                 },
+                "maxMessageSize": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "The maximum size in bytes of AMQP message 
transfers allowed for this router. This limit is applied only to transfers over 
user connections and is not applied to interrouter or edge router connections. 
This limit may be overridden by vhost or by vhost user group settings. A value 
of zero disables this limit.",
+                    "required": false,
+                    "create": true
+                },
                 "enableVhostPolicy": {
                     "type": "boolean",
                     "default": false,
@@ -1905,10 +1912,11 @@
                     "graph": true,
                     "description": "The sum of all vhost sender and receiver 
denials."
                 },
+                "maxMessageSizeDenied": {"type": "integer", "graph": true},
                 "totalDenials": {
                     "type": "integer",
                     "graph": true,
-                    "description": "The total number of connection and link 
denials."
+                    "description": "The total number of connection, link, and 
transfer denials."
                 }
             }
         },
@@ -1933,6 +1941,11 @@
                     "create": true,
                     "update": true
                 },
+                "maxMessageSize": {
+                    "type": "integer",
+                    "description": "Optional maximum size in bytes of AMQP 
message transfers allowed for connections to this vhost. This limit overrides 
the policy maxMessageSize value and may be overridden by vhost user group 
settings. A value of zero disables this limit.",
+                    "required": false
+                },
                 "maxConnectionsPerUser": {
                     "type": "integer",
                     "default": 65535,
@@ -1993,6 +2006,11 @@
                     "required": false,
                     "create": false
                 },
+                "maxMessageSize": {
+                    "type": "integer",
+                    "description": "Optional maximum size in bytes of AMQP 
message transfers allowed for connections created by users in this group. This 
limit overrides the policy and vhost maxMessageSize values. A value of zero 
disables this limit.",
+                    "required": false
+                },
                 "maxFrameSize": {
                     "type": "integer",
                     "description": "The largest frame, in bytes, that may be 
sent on this connection. Non-zero policy values overwrite values specified for 
a listener object (AMQP Open, max-frame-size).",
@@ -2130,7 +2148,8 @@
 
                 "sessionDenied": {"type": "integer", "graph": true},
                 "senderDenied": {"type": "integer", "graph": true},
-                "receiverDenied": {"type": "integer", "graph": true}
+                "receiverDenied": {"type": "integer", "graph": true},
+                "maxMessageSizeDenied": {"type": "integer", "graph": true}
             }
         },
 
diff --git a/python/qpid_dispatch_internal/management/config.py 
b/python/qpid_dispatch_internal/management/config.py
index 8cf1940..bd1595f 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -201,10 +201,12 @@ def configure_dispatch(dispatch, lib_handle, filename):
     policyDir           = config.by_type('policy')[0]['policyDir']
     policyDefaultVhost  = config.by_type('policy')[0]['defaultVhost']
     useHostnamePatterns = 
config.by_type('policy')[0]['enableVhostNamePatterns']
+    maxMessageSize      = config.by_type('policy')[0]['maxMessageSize']
     for a in config.by_type("policy"):
         configure(a)
     agent.policy.set_default_vhost(policyDefaultVhost)
     agent.policy.set_use_hostname_patterns(useHostnamePatterns)
+    agent.policy.set_max_message_size(maxMessageSize)
 
     # Remaining configuration
     for t in "sslProfile", "authServicePlugin", "listener", "connector", \
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py 
b/python/qpid_dispatch_internal/policy/policy_local.py
index 37733f1..cfe3400 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -120,7 +120,7 @@ class PolicyCompiler(object):
     """
     Validate incoming configuration for legal schema.
     - Warn about section options that go unused.
-    - Disallow negative max connection numbers.
+    - Disallow negative max connection/message size numbers.
     - Check that connectionOrigins resolve to IP hosts.
     - Enforce internal consistency,
     """
@@ -131,6 +131,7 @@ class PolicyCompiler(object):
         PolicyKeys.KW_IGNORED_TYPE,
         PolicyKeys.KW_VHOST_NAME,
         PolicyKeys.KW_MAXCONN,
+        PolicyKeys.KW_MAX_MESSAGE_SIZE,
         PolicyKeys.KW_MAXCONNPERHOST,
         PolicyKeys.KW_MAXCONNPERUSER,
         PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT,
@@ -232,7 +233,7 @@ class PolicyCompiler(object):
 
     def compile_app_settings(self, vhostname, usergroup, policy_in, 
policy_out, warnings, errors):
         """
-        Compile a schema from processed json format to local internal format.
+        Compile a vhostUserGroupSettings schema from processed json format to 
local internal format.
         @param[in] name vhost name
         @param[in] policy_in user config settings
         @param[out] policy_out validated Internal format
@@ -248,7 +249,7 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_REMOTE_HOSTS] = ''
         # DISPATCH-1277 - KW_MAX_FRAME_SIZE must be defaulted to 16384 not 
2147483647
         policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 16384
-        policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = 0
+        policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
         policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647
         policy_out[PolicyKeys.KW_MAX_SESSIONS] = 65536
         policy_out[PolicyKeys.KW_MAX_SENDERS] = 2147483647
@@ -411,7 +412,7 @@ class PolicyCompiler(object):
 
     def compile_access_ruleset(self, name, policy_in, policy_out, warnings, 
errors):
         """
-        Compile a schema from processed json format to local internal format.
+        Compile a vhost schema from processed json format to local internal 
format.
         @param[in] name vhost name
         @param[in] policy_in raw policy to be validated
         @param[out] policy_out validated Internal format
@@ -429,6 +430,7 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535
         policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False
         policy_out[PolicyKeys.KW_GROUPS] = {}
+        policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
 
         # validate the options
         for key, val in dict_iteritems(policy_in):
@@ -445,6 +447,14 @@ class PolicyCompiler(object):
                     errors.append(msg)
                     return False
                 policy_out[key] = val
+            elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE
+                       ]:
+                if not self.validateNumber(val, 0, 0, cerror):
+                    msg = ("Policy vhost '%s' option '%s' has error '%s'." %
+                           (name, key, cerror[0]))
+                    errors.append(msg)
+                    return False
+                policy_out[key] = val
             elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
                 if not type(val) is bool:
                     errors.append("Policy vhost '%s' option '%s' must be of 
type 'bool' but is '%s'" %
@@ -609,6 +619,9 @@ class PolicyLocal(object):
         #  When true policy ruleset definitions are propagated to C code
         self.use_hostname_patterns = False
 
+        # _max_message_size
+        #  holds global value from policy config object
+        self._max_message_size = 0
     #
     # Service interfaces
     #
@@ -830,6 +843,13 @@ class PolicyLocal(object):
 
             upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname])
 
+            maxsize = upolicy.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
+            if maxsize is None:
+                maxsize = ruleset.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None)
+                if maxsize is None:
+                    maxsize = self._max_message_size
+                upolicy[PolicyKeys.KW_MAX_MESSAGE_SIZE] = maxsize
+
             upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats()
             return True
         except Exception as e:
@@ -851,6 +871,15 @@ class PolicyLocal(object):
             self._manager.log_trace(
                 "Policy internal error closing connection id %s. %s" % 
(conn_id, str(e)))
 
+    def set_max_message_size(self, size):
+        """
+        record max message size from policy config object
+        :param size:
+        :return:ls
+
+        """
+        self._max_message_size = size
+
     #
     #
     def test_load_config(self):
diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py 
b/python/qpid_dispatch_internal/policy/policy_manager.py
index fe5e4e8..f7f35fb 100644
--- a/python/qpid_dispatch_internal/policy/policy_manager.py
+++ b/python/qpid_dispatch_internal/policy/policy_manager.py
@@ -161,6 +161,14 @@ class PolicyManager(object):
         @return: none
         """
         self._policy_local.close_connection(conn_id)
+
+    def set_max_message_size(self, size):
+        """
+        Policy has set global maxMessageSize.
+        :param size:
+        :return: none
+        """
+        self._policy_local.set_max_message_size(size)
 #
 #
 #
diff --git a/src/container.c b/src/container.c
index d08800b..a728de7 100644
--- a/src/container.c
+++ b/src/container.c
@@ -159,7 +159,7 @@ static void setup_outgoing_link(qd_container_t *container, 
pn_link_t *pn_link)
 }
 
 
-static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link)
+static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, 
int max_size)
 {
     qd_node_t *node = container->default_node;
 
@@ -191,6 +191,9 @@ static void setup_incoming_link(qd_container_t *container, 
pn_link_t *pn_link)
     link->node       = node;
     link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);
 
+    if (max_size) {
+        pn_link_set_max_message_size(pn_link, (uint64_t)max_size);
+    }
     pn_link_set_context(pn_link, link);
     node->ntype->incoming_handler(node->context, link);
 }
@@ -652,7 +655,7 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event,
                         }
                         qd_conn->n_senders++;
                     }
-                    setup_incoming_link(container, pn_link);
+                    setup_incoming_link(container, pn_link, 
qd_connection_max_message_size(qd_conn));
                 }
             } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
                 handle_link_open(container, pn_link);
diff --git a/src/message.c b/src/message.c
index a3420a1..0d7b08b 100644
--- a/src/message.c
+++ b/src/message.c
@@ -29,6 +29,7 @@
 #include "compose_private.h"
 #include "connection_manager_private.h"
 #include "aprintf.h"
+#include "policy.h"
 #include <string.h>
 #include <ctype.h>
 #include <stdio.h>
@@ -941,7 +942,6 @@ qd_message_t *qd_message()
     msg->content->lock = sys_mutex();
     sys_atomic_init(&msg->content->ref_count, 1);
     msg->content->parse_depth = QD_DEPTH_NONE;
-
     return (qd_message_t*) msg;
 }
 
@@ -1244,14 +1244,13 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool 
tag_sent)
 /**
  * Receive and discard large messages for which there is no destination.
  * Don't waste resources by putting the message into internal buffers.
- * Don't fiddle with locking as no sender is competing with reception.
+ * Message locking is not required since the message content buffers are 
untouched.
  */
 qd_message_t *discard_receive(pn_delivery_t *delivery,
                               pn_link_t     *link,
                               qd_message_t  *msg_in)
 {
     qd_message_pvt_t *msg  = (qd_message_pvt_t*)msg_in;
-
     while (1) {
 #define DISCARD_BUFFER_SIZE (128 * 1024)
         char dummy[DISCARD_BUFFER_SIZE];
@@ -1261,13 +1260,17 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
             // have read all available pn_link incoming bytes
             break;
         } else if (rc == PN_EOS || rc < 0) {
-            // end of message or error. Call the message complete
-            msg->content->receive_complete = true;
+            // End of message or error: finalize message_receive handling
             msg->content->aborted = pn_delivery_aborted(delivery);
             qd_nullify_safe_ptr(&msg->content->input_link_sp);
-
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
+            if (msg->content->oversize) {
+                // Aborting the content disposes of downstream copies.
+                // This has no effect on the received message.
+                msg->content->aborted = true;
+            }
+            msg->content->receive_complete = true;
             break;
         } else {
             // rc was > 0. bytes were read and discarded.
@@ -1308,11 +1311,13 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
         msg->strip_annotations_in  = qd_connection_strip_annotations_in(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
+        msg->content->max_message_size = qd_connection_max_message_size(qdc);
     }
 
     //
     // The discard flag indicates we should keep reading the input stream
     // but not process the message for delivery.
+    // Oversize messages are also discarded.
     //
     if (msg->content->discard) {
         return discard_receive(delivery, link, (qd_message_t *)msg);
@@ -1416,10 +1421,24 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
             recv_error = true;
         } else if (rc > 0) {
             //
-            // We have received a positive number of bytes for the message.  
Advance
-            // the cursor in the buffer.
+            // We have received a positive number of bytes for the message.  
+            // Advance the cursor in the buffer.
             //
             qd_buffer_insert(content->pending, rc);
+
+            // Handle maxMessageSize violations
+            if (content->max_message_size) {
+                content->bytes_received += rc;
+                if (content->bytes_received > content->max_message_size)
+                {
+                    qd_connection_t *conn = qd_link_connection(qdl);
+                    qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer 
maxMessageSize exceeded");
+                    qd_policy_count_max_size_event(link, conn);
+                    content->discard = true;
+                    content->oversize = true;
+                    return discard_receive(delivery, link, (qd_message_t*)msg);
+                }
+            }
         } else {
             //
             // We received zero bytes, and no PN_EOS.  This means that we've 
received
@@ -2231,3 +2250,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool 
aborted)
     qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
     msg_pvt->content->aborted = aborted;
 }
+
+bool qd_message_oversize(const qd_message_t *msg)
+{
+    qd_message_content_t * mc = MSG_CONTENT(msg);
+    return mc->oversize;
+}
diff --git a/src/message_private.h b/src/message_private.h
index 7bc66dc..ba13538 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -109,6 +109,8 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
+    int                  max_message_size;               // configured max; 0 
if no max to enforce
+    int                  bytes_received;                 // bytes returned by 
pn_link_recv() when enforcing max_message_size
     uint32_t             fanout;                         // The number of 
receivers for this message, including in-process subscribers.
     qd_link_t_sp         input_link_sp;                  // message received 
on this link
 
@@ -120,6 +122,7 @@ typedef struct {
     bool                 disable_q2_holdoff;             // Disable the Q2 
flow control
     bool                 priority_parsed;
     bool                 priority_present;
+    bool                 oversize;                       // policy oversize 
handling in effect
     uint8_t              priority;                       // The priority of 
this message
 } qd_message_content_t;
 
diff --git a/src/policy.c b/src/policy.c
index 896540c..4a45bed 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -46,6 +46,7 @@ static uint64_t n_connections = 0;
 static uint64_t n_denied = 0;
 static uint64_t n_processed = 0;
 static uint64_t n_links_denied = 0;
+static uint64_t n_maxsize_messages_denied = 0;
 static uint64_t n_total_denials = 0;
 
 //
@@ -83,6 +84,9 @@ static PyObject * module = 0;
 
 ALLOC_DEFINE(qd_policy_settings_t);
 
+// Policy log module used outside of policy proper
+qd_log_source_t* policy_log_source = 0;
+
 //
 // Policy configuration/statistics management interface
 //
@@ -116,6 +120,7 @@ qd_policy_t *qd_policy(qd_dispatch_t *qd)
     policy->tree_lock            = sys_mutex();
     policy->hostname_tree        = qd_parse_tree_new(QD_PARSE_TREE_ADDRESS);
     stats_lock                   = sys_mutex();
+    policy_log_source            = policy->log_source;
 
     qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized");
     return policy;
@@ -206,7 +211,8 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, 
qd_entity_t *entity)
     qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts;
     if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) &&
         !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) &&
-        !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied)
+        !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) &&
+        !qd_entity_set_long(entity, "maxMessageSizeDenied", 
dc->maxSizeMessagesDenied)
     )
         return QD_ERROR_NONE;
     return qd_error_code();
@@ -218,13 +224,14 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, 
qd_entity_t *entity)
  **/
 qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) {
     // Return global stats
-    uint64_t np, nd, nc, nl, nt;
+    uint64_t np, nd, nc, nl, nm, nt;
     sys_mutex_lock(stats_lock);
     {
         np = n_processed;
         nd = n_denied;
         nc = n_connections;
         nl = n_links_denied;
+        nm = n_maxsize_messages_denied;
         nt = n_total_denials;
     }
     sys_mutex_unlock(stats_lock);
@@ -232,6 +239,7 @@ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, 
void *unused) {
         !qd_entity_set_long(entity, "connectionsDenied", nd) &&
         !qd_entity_set_long(entity, "connectionsCurrent", nc) &&
         !qd_entity_set_long(entity, "linksDenied", nl) &&
+        !qd_entity_set_long(entity, "maxMessageSizeDenied", nm) &&
         !qd_entity_set_long(entity, "totalDenials", nt)
     )
         return QD_ERROR_NONE;
@@ -303,19 +311,21 @@ void qd_policy_socket_close(qd_policy_t *policy, const 
qd_connection_t *conn)
         qd_python_unlock(lock_state);
     }
     const char *hostname = qd_connection_name(conn);
-    int ssnDenied = 0;
-    int sndDenied = 0;
-    int rcvDenied = 0;
+    uint64_t ssnDenied = 0;
+    uint64_t sndDenied = 0;
+    uint64_t rcvDenied = 0;
+    uint64_t sizDenied = 0;
     if (conn->policy_settings && conn->policy_settings->denialCounts) {
         ssnDenied = conn->policy_settings->denialCounts->sessionDenied;
         sndDenied = conn->policy_settings->denialCounts->senderDenied;
         rcvDenied = conn->policy_settings->denialCounts->receiverDenied;
-    }
-    qd_log(policy->log_source, QD_LOG_DEBUG, 
+        sizDenied = conn->policy_settings->denialCounts->maxSizeMessagesDenied;
+        qd_log(policy->log_source, QD_LOG_DEBUG,
            "[C%"PRIu64"] Connection '%s' closed with resources n_sessions=%d, 
n_senders=%d, n_receivers=%d, "
-           "sessions_denied=%d, senders_denied=%d, receivers_denied=%d. 
nConnections= %d.",
+           "sessions_denied=%ld, senders_denied=%ld, receivers_denied=%ld. 
nConnections= %ld.",
             conn->connection_id, hostname, conn->n_sessions, conn->n_senders, 
conn->n_receivers,
-            ssnDenied, sndDenied, rcvDenied, n_connections);
+            ssnDenied, sndDenied, rcvDenied, sizDenied, n_connections);
+    }
 }
 
 
@@ -502,6 +512,7 @@ bool qd_policy_open_fetch_settings(
                         settings->maxSessions          = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
                         settings->maxSenders           = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
                         settings->maxReceivers         = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
+                        settings->maxMessageSize       = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
                         if (!settings->allowAnonymousSender) { //don't 
override if enabled by authz plugin
                             settings->allowAnonymousSender = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
                         }
@@ -666,6 +677,20 @@ void _qd_policy_deny_amqp_receiver_link(pn_link_t 
*pn_link, qd_connection_t *qd_
 }
 
 
+//
+//
+void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn)
+{
+    sys_mutex_lock(stats_lock);
+    n_maxsize_messages_denied++;
+    n_total_denials++;
+    sys_mutex_unlock(stats_lock);
+    // TODO: denialCounts is shared among connections and should be protected 
also
+    if (qd_conn->policy_settings && qd_conn->policy_settings->denialCounts) {
+        qd_conn->policy_settings->denialCounts->maxSizeMessagesDenied++;
+    }
+}
+
 /**
  * Given a char return true if it is a parse_tree token separater
  */
@@ -1474,3 +1499,8 @@ char * qd_policy_compile_allowed_csv(char * csv)
     free(dup);
     return result;
 }
+
+
+qd_log_source_t* qd_policy_log_source() {
+    return policy_log_source;
+}
diff --git a/src/policy.h b/src/policy.h
index a076eb3..44e8794 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -39,6 +39,7 @@ struct qd_policy_denial_counts_s {
     uint64_t sessionDenied;
     uint64_t senderDenied;
     uint64_t receiverDenied;
+    uint64_t maxSizeMessagesDenied;
 };
 
 typedef struct qd_policy_t qd_policy_t;
@@ -49,6 +50,7 @@ struct qd_policy__settings_s {
     int  maxSessions;
     int  maxSenders;
     int  maxReceivers;
+    int  maxMessageSize;
     bool allowDynamicSource;
     bool allowAnonymousSender;
     bool allowUserIdProxy;
@@ -231,6 +233,7 @@ char * qd_policy_host_pattern_lookup(qd_policy_t *policy, 
const char *hostPatter
  * @return the ruleset string to be used in policy settings.
  */
 char * qd_policy_compile_allowed_csv(char * csv);
+
 /**
  * Approve sending of message on anonymous link based on connection's policy.
  *
@@ -238,4 +241,17 @@ char * qd_policy_compile_allowed_csv(char * csv);
  * @param[in] qd_conn dispatch connection with policy settings
  */
 bool qd_policy_approve_message_target(qd_iterator_t *address, qd_connection_t 
*qd_conn);
+
+/**
+ * Increment counters for a link when policy maxMessageSize limit is exceeded.
+ *
+ * @param[in] pn_link proton link being with delivery/transfer being rejected
+ * @param[in] qd_conn dispatch connection with policy settings and counts
+ **/
+void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn);
+
+/**
+ * Return POLICY log_source to log policy 
+ */
+qd_log_source_t* qd_policy_log_source();
 #endif
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 715d8d0..9e0400e 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -50,6 +50,12 @@ qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery)
 }
 
 
+bool qdr_delivery_oversize(const qdr_delivery_t *delivery)
+{
+    return delivery && delivery->msg && qd_message_oversize(delivery->msg);
+}
+
+
 bool qdr_delivery_send_complete(const qdr_delivery_t *delivery)
 {
     if (!delivery)
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 9a7edfb..8807996 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -76,6 +76,7 @@ ALLOC_DECLARE(qdr_delivery_t);
 
 bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
 bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
+bool qdr_delivery_oversize(const qdr_delivery_t *delivery);
 
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(const qdr_delivery_t *delivery);
@@ -89,6 +90,7 @@ uint64_t qdr_delivery_disposition(const qdr_delivery_t 
*delivery);
 void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t 
disposition);
 
 void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
+bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery);
 
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index f885eef..3dab635 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -185,7 +185,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                     DEQ_REMOVE_HEAD(link->undelivered);
                     dlv->link_work = 0;
 
-                    if (settled) {
+                    if (settled || qdr_delivery_oversize(dlv) || 
qdr_delivery_is_aborted(dlv)) {
                         dlv->where = QDR_DELIVERY_NOWHERE;
                         qdr_delivery_decref(core, dlv, 
"qdr_link_process_deliveries - remove from undelivered list");
                     } else {
diff --git a/src/router_node.c b/src/router_node.c
index a735280..d2e2d36 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -27,6 +27,7 @@
 #include "entity_cache.h"
 #include "router_private.h"
 #include "delivery.h"
+#include "policy.h"
 #include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/proton_utils.h>
 #include <proton/sasl.h>
@@ -287,8 +288,9 @@ static void log_link_message(qd_connection_t *conn, 
pn_link_t *pn_link, qd_messa
     const qd_server_config_t *cf = qd_connection_config(conn);
     if (!cf) return;
     char buf[qd_message_repr_len()];
-    const char *msg_str = qd_message_aborted(msg) ?
-        "aborted message" : qd_message_repr(msg, buf, sizeof(buf), 
cf->log_bits);
+    const char *msg_str = qd_message_oversize(msg) ? "oversize message" :
+                          qd_message_aborted(msg) ? "aborted message" : 
+                          qd_message_repr(msg, buf, sizeof(buf), cf->log_bits);
     if (msg_str) {
         const char *src = pn_terminus_get_address(pn_link_source(pn_link));
         const char *tgt = pn_terminus_get_address(pn_link_target(pn_link));
@@ -335,49 +337,67 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
     qd_message_t   *msg   = qd_message_receive(pnd);
     bool receive_complete = qd_message_receive_complete(msg);
 
-    if (receive_complete) {
-        log_link_message(conn, pn_link, msg);
+    if (!qd_message_oversize(msg)) {
+        // message not rejected as oversize
+        if (receive_complete) {
+            log_link_message(conn, pn_link, msg);
 
-        //
-        // The entire message has been received and we are ready to consume 
the delivery by calling pn_link_advance().
-        //
-        pn_link_advance(pn_link);
-        next_delivery = pn_link_current(pn_link) != 0;
+            //
+            // The entire message has been received and we are ready to 
consume the delivery by calling pn_link_advance().
+            //
+            pn_link_advance(pn_link);
+            next_delivery = pn_link_current(pn_link) != 0;
 
-        uint64_t local_disp = qdr_delivery_disposition(delivery);
-        if (local_disp != 0) {
-            pn_delivery_update(pnd, local_disp);
+            uint64_t local_disp = qdr_delivery_disposition(delivery);
+            if (local_disp != 0) {
+                pn_delivery_update(pnd, local_disp);
+            }
         }
-    }
 
-    if (qd_message_is_discard(msg)) {
-        //
-        // Message has been marked for discard, no further processing necessary
-        //
+        if (qd_message_is_discard(msg)) {
+            //
+            // Message has been marked for discard, no further processing 
necessary
+            //
+            if (receive_complete) {
+                // If this discarded delivery has already been settled by 
proton,
+                // set the presettled flag on the delivery to true if it is 
not already true.
+                // Since the entire message has already been received, we 
directly call the
+                // function to set the pre-settled flag since we cannot go 
thru the core-thread
+                // to do this since the delivery has been discarded.
+                // Discarded streaming deliveries are not put thru the core 
thread via the continue action.
+                if (pn_delivery_settled(pnd))
+                    qdr_delivery_set_presettled(delivery);
+
+
+                // note: expected that the code that set discard has handled
+                // setting disposition and updating flow!
+                pn_delivery_settle(pnd);
+                if (delivery) {
+                    // if delivery already exists then the core thread 
discarded this
+                    // delivery, it will eventually free the qdr_delivery_t 
and its
+                    // associated message - do not free it here.
+                    qdr_node_disconnect_deliveries(router->router_core, link, 
delivery, pnd);
+                } else {
+                    qd_message_free(msg);
+                }
+            }
+            return next_delivery;
+        }
+    } else {
+        // message is oversize
         if (receive_complete) {
-            // If this discarded delivery has already been settled by proton,
-            // set the presettled flag on the delivery to true if it is not 
already true.
-            // Since the entire message has already been received, we directly 
call the
-            // function to set the pre-settled flag since we cannot go thru 
the core-thread
-            // to do this since the delivery has been discarded.
-            // Discarded streaming deliveries are not put thru the core thread 
via the continue action.
-            if (pn_delivery_settled(pnd))
-                qdr_delivery_set_presettled(delivery);
-
-
-            // note: expected that the code that set discard has handled
-            // setting disposition and updating flow!
+            // reject and settle the incoming delivery
+            pn_delivery_update(pnd, PN_REJECTED);
             pn_delivery_settle(pnd);
-            if (delivery) {
-                // if delivery already exists then the core thread discarded 
this
-                // delivery, it will eventually free the qdr_delivery_t and its
-                // associated message - do not free it here.
-                qdr_node_disconnect_deliveries(router->router_core, link, 
delivery, pnd);
-            } else {
-                qd_message_free(msg);
-            }
+            // close the connection
+            pn_connection_t * pn_conn = qd_connection_pn(conn);
+            pn_condition_t * cond = pn_connection_condition(pn_conn);
+            (void) pn_condition_set_name(       cond, 
QD_AMQP_COND_CONNECTION_FORCED);
+            (void) pn_condition_set_description(cond, "Message size exceeded");
+            pn_connection_close(pn_conn);
         }
-        return next_delivery;
+        return false;
+        // oversize messages are not processed any further
     }
 
     //
@@ -1913,3 +1933,21 @@ void qd_link_restart_rx(qd_link_t *in_link)
         qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, 
safe_ptr);
     }
 }
+
+
+// Issue a warning POLICY log message with connection and link identities
+// prepended to the policy denial text string.
+void qd_connection_log_policy_denial(qd_link_t *link, const char *text)
+{
+    qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
+    uint64_t l_id = 0;
+    uint64_t c_id = 0;
+    if (rlink) {
+        l_id = rlink->identity;
+        if (rlink->conn) {
+            c_id = rlink->conn->identity;
+        }
+    }    
+    qd_log(qd_policy_log_source(), QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] 
%s",
+           c_id, l_id, text);
+}
diff --git a/src/server.c b/src/server.c
index b3a553e..c0e57c3 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1665,3 +1665,7 @@ sys_mutex_t *qd_server_get_activation_lock(qd_server_t * 
server)
 {
     return server->conn_activation_lock;
 }
+
+int qd_connection_max_message_size(const qd_connection_t *c) {
+    return (c && c->policy_settings) ? c->policy_settings->maxMessageSize : 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index e34db23..041ee26 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -99,6 +99,8 @@ foreach(py_test_module
     system_tests_default_distribution
     system_tests_deprecated
     system_tests_policy
+    system_tests_policy_oversize_basic
+    system_tests_policy_oversize_compound
     system_tests_protocol_family
     system_tests_protocol_settings
     system_tests_qdmanage
diff --git a/tests/system_tests_policy_oversize_basic.py 
b/tests/system_tests_policy_oversize_basic.py
new file mode 100644
index 0000000..8040817
--- /dev/null
+++ b/tests/system_tests_policy_oversize_basic.py
@@ -0,0 +1,792 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest as unittest
+import os, json, re, signal
+import sys
+import time
+
+from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, 
DIR, QdManager, Logger
+from subprocess import PIPE, STDOUT
+from proton import ConnectionException, Timeout, Url, symbol, Message
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, ReceiverOption
+from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse
+from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled
+from qpid_dispatch_internal.compat import dict_iteritems
+from test_broker import FakeBroker
+
+# How many worker threads?
+W_THREADS = 2
+
+# Define oversize denial condition
+OVERSIZE_CONDITION_NAME = "amqp:connection:forced"
+OVERSIZE_CONDITION_DESC = "Message size exceeded"
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+#
+# DISPATCH-975 Detect that an oversize message is blocked.
+# These tests check basic blocking where the the sender is blocked by
+# the ingress routers. It does not check compound blocking where
+# oversize is allowed or denied by an ingress edge router but also
+# denied by the uplink interior router.
+
+class OversizeMessageTransferTest(MessagingHandler):
+    """
+    This test connects a sender and a receiver. Then it tries to send _count_
+    number of messages of the given size through the router or router network.
+
+    With expect_block=True the ingress router should detect the sender's 
oversize
+    message and close the sender connection. The receiver may receive
+    aborted message indications but that is not guaranteed. If any aborted
+    messages are received then the count must be at most one.
+    The test is a success when the sender receives a connection error with
+    oversize indication and the receiver has not received too many aborts.
+
+    With expect_block=False sender messages should be received normally.
+    The test is a success when n_accepted == count.
+    """
+    def __init__(self, sender_host, receiver_host, test_address,
+                 message_size=100000, count=10, expect_block=True, 
print_to_console=False):
+        super(OversizeMessageTransferTest, self).__init__()
+        self.sender_host = sender_host
+        self.receiver_host = receiver_host
+        self.test_address = test_address
+        self.msg_size = message_size
+        self.count = count
+        self.expect_block = expect_block
+
+        self.sender_conn = None
+        self.receiver_conn = None
+        self.error = None
+        self.sender = None
+        self.receiver = None
+        self.proxy = None
+
+        self.n_sent = 0
+        self.n_rcvd = 0
+        self.n_accepted = 0
+        self.n_rejected = 0
+        self.n_aborted = 0
+        self.n_connection_error = 0
+        self.shut_down = False
+
+        self.logger = Logger(title=("OversizeMessageTransferTest - %s" % 
(self.test_address)), print_to_console=print_to_console)
+        self.log_unhandled = False
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_rejected=%d 
n_aborted=%d" % \
+                     (self.n_sent, self.n_rcvd, self.n_rejected, 
self.n_aborted)
+        self.logger.log("self.timeout " + self.error)
+        self._shut_down_test()
+
+    def on_start(self, event):
+        self.logger.log("on_start")
+        self.timer = event.reactor.schedule(10, Timeout(self))
+        self.logger.log("on_start: opening receiver connection to %s" % 
(self.receiver_host.addresses[0]))
+        self.receiver_conn = 
event.container.connect(self.receiver_host.addresses[0])
+        self.logger.log("on_start: opening   sender connection to %s" % 
(self.sender_host.addresses[0]))
+        self.sender_conn = 
event.container.connect(self.sender_host.addresses[0])
+        self.logger.log("on_start: Creating receiver")
+        self.receiver = event.container.create_receiver(self.receiver_conn, 
self.test_address)
+        self.logger.log("on_start: Creating sender")
+        self.sender = event.container.create_sender(self.sender_conn, 
self.test_address)
+        self.logger.log("on_start: done")
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.count:
+            # construct message in indentifiable chunks
+            body_msg = ""
+            padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[self.n_sent % 30]
+            while len(body_msg) < self.msg_size:
+                chunk = "[%s:%d:%d" % (self.test_address, self.n_sent, 
len(body_msg))
+                padlen = 50 - len(chunk)
+                chunk += padchar * padlen
+                body_msg += chunk
+            if len(body_msg) > self.msg_size:
+                body_msg = body_msg[:self.msg_size]
+            self.logger.log("send. address:%s message:%d of %s length=%d" %
+                            (self.test_address, self.n_sent, self.count, 
self.msg_size))
+            m = Message(body=body_msg)
+            self.sender.send(m)
+            self.n_sent += 1
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            self.logger.log("on_sendable")
+            self.send()
+
+    def on_message(self, event):
+        if self.expect_block:
+            # All messages should violate maxMessageSize.
+            # Receiving any is an error.
+            self.error = "Received a message. Expected to receive no messages."
+            self.logger.log(self.error)
+            self._shut_down_test()
+        else:
+            self.n_rcvd += 1
+            self.accept(event.delivery)
+            self._check_done()
+
+    def on_connection_remote_close(self, event):
+        if self.shut_down:
+            return
+        if event.connection == self.sender_conn:
+            if not event.connection.remote_condition is None:
+                if event.connection.remote_condition.name == 
OVERSIZE_CONDITION_NAME and \
+                   event.connection.remote_condition.description == 
OVERSIZE_CONDITION_DESC:
+                    self.logger.log("on_connection_remote_close: sender closed 
with correct condition")
+                    self.n_connection_error += 1
+                    self.sender_conn.close()
+                    self.sender_conn = None
+                else:
+                    # sender closed but for wrong reason
+                    self.error = "sender close error: Expected name: %s, 
description: %s, but received name: %s, description: %s" % (
+                                 OVERSIZE_CONDITION_NAME, 
OVERSIZE_CONDITION_DESC,
+                                 event.connection.remote_condition.name, 
event.connection.remote_condition.description)
+                    self.logger.log(self.error)
+            else:
+                self.error = "sender close error: Expected a remote_condition 
but there was none."
+                self.logger.log(self.error)
+        else:
+            # connection error but not for sender
+            self.error = "unexpected connection close error: wrong connection 
closed."
+            self.logger.log(self.error)
+        self._check_done()
+
+    def _shut_down_test(self):
+        self.shut_down = True
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        if self.sender:
+            self.sender.close()
+            self.sender = None
+        if self.receiver:
+            self.receiver.close()
+            self.receiver = None
+        if self.sender_conn:
+            self.sender_conn.close()
+            self.sender_conn = None
+        if self.receiver_conn:
+            self.receiver_conn.close()
+            self.receiver_conn = None
+
+    def _check_done(self):
+        current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d 
connection_error:%d" %
+                   (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, 
self.n_connection_error))
+        self.logger.log(current)
+        if self.error is not None:
+            self.logger.log("TEST FAIL")
+            self._shut_down_test()
+        else:
+            done = (self.n_connection_error == 1) \
+                    if self.expect_block else \
+                    (self.n_sent == self.count and self.n_rcvd == self.count)
+
+            if done:
+                self.logger.log("TEST DONE!!!")
+                # self.log_unhandled = True # verbose debugging
+                self._shut_down_test()
+
+    def on_rejected(self, event):
+        self.n_rejected += 1
+        if self.expect_block:
+            self.logger.log("on_rejected: entry")
+            self._check_done()
+        else:
+            self.error = "Unexpected on_reject"
+            self.logger.log(self.error)
+            self._check_done()
+
+    def on_aborted(self, event):
+        self.logger.log("on_aborted")
+        self.n_aborted += 1
+        self._check_done()
+
+    def on_error(self, event):
+        self.error = "Container error"
+        self.logger.log(self.error)
+        self._shut_down_test()
+
+    def on_unhandled(self, method, *args):
+        if self.log_unhandled:
+            self.logger.log("on_unhandled: method: %s, args: %s" % (method, 
args))
+
+    def run(self):
+        try:
+            Container(self).run()
+        except Exception as e:
+            self.error = "Container run exception: %s" % (e)
+            self.logger.log(self.error)
+            self.logger.dump()
+
+
+
+# For the next test case define max sizes for each router.
+# These are the configured maxMessageSize values
+EA1_MAX_SIZE = 50000
+INTA_MAX_SIZE = 100000
+INTB_MAX_SIZE = 150000
+EB1_MAX_SIZE = 200000
+
+# Interior routers enforce max size directly.
+# Edge routers are also checked by the attached interior router.
+
+# Block tests that use edge routers that send messages to the network must
+# account for the fact that the attached interior router will apply
+# another max size. These tests do not check against EB1 max for the
+# sender if the receiver is on EA1, INTA, or INTB since INTB's max
+# would kick an and cause a false positive.
+
+# Tests that check for allowing near-max sizes use the minimum of
+# the edge router's max and the attached interior router's max.
+
+# The bytes-over and bytes-under max that should trigger allow or deny.
+# Messages with content this much over should be blocked while
+# messages with content this much under should be allowed.
+# * client overhead is typically 16 bytes or so
+# * interrouter overhead is much larger with annotations
+OVER_UNDER = 200
+
+
+class MaxMessageSizeBlockOversize(TestCase):
+    """
+    verify that maxMessageSize blocks oversize messages
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(MaxMessageSizeBlockOversize, cls).setUpClass()
+
+        def router(name, mode, max_size, extra):
+            config = [
+                ('router', {'mode': mode,
+                            'id': name,
+                            'allowUnsettledMulticast': 'yes',
+                            'workerThreads': W_THREADS}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'}),
+                ('policy', {'maxConnections': 100, 'enableVhostPolicy': 
'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}),
+                ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true',
+                    'groups': [(
+                        '$default', {
+                            'users': '*',
+                            'maxConnections': 100,
+                            'remoteHosts': '*',
+                            'sources': '*',
+                            'targets': '*',
+                            'allowAnonymousSender': 'true',
+                            'allowWaypointLinks': 'true',
+                            'allowDynamicSource': 'true'
+                        }
+                    )]}
+                )
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            return cls.routers[-1]
+
+        # configuration:
+        # two edge routers connected via 2 interior routers with max sizes
+        #
+        #  +-------+    +---------+    +---------+    +-------+
+        #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+        #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+        #  +-------+    +---------+    +---------+    +-------+
+        #
+        # Note:
+        #  * Messages whose senders connect to INT.A or INT.B are subject to 
max message size
+        #    defined for the ingress router only.
+        #  * Message whose senders connect to EA1 or EA2 are subject to max 
message size
+        #    defined for the ingress router. If the message is forwarded 
through the
+        #    connected interior router then the message is subject to another 
max message size
+        #    defined by the interior router.
+
+        cls.routers = []
+
+        interrouter_port = cls.tester.get_port()
+        cls.INTA_edge_port   = cls.tester.get_port()
+        cls.INTB_edge_port   = cls.tester.get_port()
+
+        router('INT.A', 'interior', INTA_MAX_SIZE,
+               [('listener', {'role': 'inter-router',
+                              'port': interrouter_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+        router('INT.B', 'interior', INTB_MAX_SIZE,
+               [('connector', {'name': 'connectorToA',
+                               'role': 'inter-router',
+                               'port': interrouter_port}),
+                ('listener', {'role': 'edge',
+                              'port': cls.INTB_edge_port})])
+        cls.INT_B = cls.routers[1]
+        cls.INT_B.listener = cls.INT_B.addresses[0]
+
+        router('EA1', 'edge', EA1_MAX_SIZE,
+               [('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()}),
+                ('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge_port})])
+        cls.EA1 = cls.routers[2]
+        cls.EA1.listener = cls.EA1.addresses[0]
+
+        router('EB1', 'edge', EB1_MAX_SIZE,
+               [('connector', {'name': 'uplink',
+                               'role': 'edge',
+                               'port': cls.INTB_edge_port,
+                               'maxFrameSize': 1024}),
+                ('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()})])
+        cls.EB1 = cls.routers[3]
+        cls.EB1.listener = cls.EB1.addresses[0]
+
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+        cls.EA1.wait_connectors()
+        cls.EB1.wait_connectors()
+
+    def test_40_block_oversize_INTA_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e40",
+                                           message_size=INTA_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True,
+                                           print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_40 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_41_block_oversize_INTA_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e41",
+                                           message_size=INTA_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_41 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_42_block_oversize_INTA_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e42",
+                                           message_size=INTA_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_42 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_43_block_oversize_INTA_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e43",
+                                           message_size=INTA_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_43 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_44_block_oversize_INTB_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e44",
+                                           message_size=INTB_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_44 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_45_block_oversize_INTB_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e45",
+                                           message_size=INTB_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_45 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_46_block_oversize_INTB_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e46",
+                                           message_size=INTB_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_46 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_47_block_oversize_INTB_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e47",
+                                           message_size=INTB_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_47 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_48_block_oversize_EA1_INTA(self):
+        if EA1_MAX_SIZE >= INTA_MAX_SIZE:
+            self.skipTest("EA1 sending to INT.A may be blocked by EA1 limit 
and also by INT.A limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e48",
+                                           message_size=EA1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_48 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_49_block_oversize_EA1_INTB(self):
+        if EA1_MAX_SIZE >= INTA_MAX_SIZE:
+            self.skipTest("EA1 sending to INT.B may be blocked by EA1 limit 
and also by INT.A limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e49",
+                                           message_size=EA1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_49 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4a_block_oversize_EA1_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e4a",
+                                           message_size=EA1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4a test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4b_block_oversize_EA1_EB1(self):
+        if EA1_MAX_SIZE >= INTA_MAX_SIZE:
+            self.skipTest("EA1 sending to EB1 may be blocked by EA1 limit and 
also by INT.A limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e4b",
+                                           message_size=EA1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4b test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4c_block_oversize_EB1_INTA(self):
+        if EB1_MAX_SIZE > INTB_MAX_SIZE:
+            self.skipTest("EB1 sending to INT.A may be blocked by EB1 limit 
and also by INT.B limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e4c",
+                                           message_size=EB1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4c test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4d_block_oversize_EB1_INTB(self):
+        if EB1_MAX_SIZE > INTB_MAX_SIZE:
+            self.skipTest("EB1 sending to INT.B may be blocked by EB1 limit 
and also by INT.B limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e4d",
+                                           message_size=EB1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4d test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4e_block_oversize_EB1_EA1(self):
+        if EB1_MAX_SIZE > INTB_MAX_SIZE:
+            self.skipTest("EB1 sending to EA1 may be blocked by EB1 limit and 
also by INT.B limit. That condition is tested in compound test.")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e4e",
+                                           message_size=EB1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4e test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_4f_block_oversize_EB1_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e4f",
+                                           message_size=EB1_MAX_SIZE + 
OVER_UNDER,
+                                           expect_block=True)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_4f test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    #
+    # tests under maxMessageSize should not block
+    #
+    def test_50_allow_undersize_INTA_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e50",
+                                           message_size=INTA_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_50 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_51_allow_undersize_INTA_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e51",
+                                           message_size=INTA_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_51 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_52_allow_undersize_INTA_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e52",
+                                           message_size=INTA_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_52 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_53_allow_undersize_INTA_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e53",
+                                           message_size=INTA_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_53 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_54_allow_undersize_INTB_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e54",
+                                           message_size=INTB_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_54 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_55_allow_undersize_INTB_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e55",
+                                           message_size=INTB_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_55 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_56_allow_undersize_INTB_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e56",
+                                           message_size=INTB_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_56 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_57_allow_undersize_INTB_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e57",
+                                           message_size=INTB_MAX_SIZE - 
OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_57 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_58_allow_undersize_EA1_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e58",
+                                           message_size=min(EA1_MAX_SIZE, 
INTA_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_58 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_59_allow_undersize_EA1_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e59",
+                                           message_size=min(EA1_MAX_SIZE, 
INTA_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_59 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_5a_allow_undersize_EA1_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e5a",
+                                           message_size=min(EA1_MAX_SIZE, 
INTA_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5a test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_5b_allow_undersize_EA1_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e5b",
+                                           message_size=min(EA1_MAX_SIZE, 
INTA_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5b test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+    def test_5c_allow_undersize_EB1_INTA(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_A,
+                                           "e5c",
+                                           message_size=min(EB1_MAX_SIZE, 
INTB_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5c test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_5d_allow_undersize_EB1_INTB(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e5d",
+                                           message_size=min(EB1_MAX_SIZE, 
INTB_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5d test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_5e_allow_undersize_EB1_EA1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e5e",
+                                           message_size=min(EB1_MAX_SIZE, 
INTB_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5e test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+    def test_5f_allow_undersize_EB1_EB1(self):
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           "e5f",
+                                           message_size=min(EB1_MAX_SIZE, 
INTB_MAX_SIZE) - OVER_UNDER,
+                                           expect_block=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_5f test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())
diff --git a/tests/system_tests_policy_oversize_compound.py 
b/tests/system_tests_policy_oversize_compound.py
new file mode 100644
index 0000000..35da8b5
--- /dev/null
+++ b/tests/system_tests_policy_oversize_compound.py
@@ -0,0 +1,1058 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest as unittest
+import os, json, re, signal
+import sys
+import time
+
+from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, 
DIR, QdManager, Logger
+from subprocess import PIPE, STDOUT
+from proton import ConnectionException, Timeout, Url, symbol, Message
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, ReceiverOption
+from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse
+from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled
+from qpid_dispatch_internal.compat import dict_iteritems
+from test_broker import FakeBroker
+
+# How many worker threads?
+W_THREADS = 2
+
+# Define oversize denial condition
+OVERSIZE_CONDITION_NAME = "amqp:connection:forced"
+OVERSIZE_CONDITION_DESC = "Message size exceeded"
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+# For the next test case define max sizes for each router.
+# These are the configured maxMessageSize values
+EA1_MAX_SIZE = 50000
+INTA_MAX_SIZE = 100000
+INTB_MAX_SIZE = 150000
+EB1_MAX_SIZE = 200000
+
+# Interior routers check for max message size at message ingress over all 
connections
+#   except interrouter connections.
+# Edge routers check for max message size at message ingress over all 
connections
+#   except interior router connections.
+# Edge routers may check a max message size and allow the message to be 
delivered
+#   to destinations on that edge router. However, if the message is passed to 
an
+#   interior router then the message is subject to the interior router's max 
size
+#   before the message is forwarded by the interior router network.
+
+# The bytes-over and bytes-under max that should trigger allow or deny.
+# Messages with content this much over should be blocked while
+# messages with content this much under should be allowed.
+# * client overhead is typically 16 bytes or so
+# * interrouter overhead is much larger with annotations
+OVER_UNDER = 200
+
+# Alert:
+# This module has two large classes that are laid out about the same:
+#    OversizeMessageTransferTest
+#    OversizeMulticastTransferTest
+# The MessageTransfer test does a single sender and single receiver while
+# the MulticastTransfer test does a single sender and four receivers.
+# Much of the logic between tests is duplicated. Remember to fix things in 
both tests.
+
+class OversizeMessageTransferTest(MessagingHandler):
+    """
+    This test connects a sender and a receiver. Then it tries to send _count_
+    number of messages of the given size through the router or router network.
+
+    Messages are to pass through an edge router and get blocked by an interior
+    or messages are to be blocked by both the edge and the interior.
+
+    When 'blocked_by_both' is false then:
+
+    * The ingress router should allow the sender's oversize message.
+    * The message is blocked by the uplink router by rejecting the message
+    and closing the connection between the interior and edge routers.
+    * The receiver may receive aborted message indications but that is
+    not guaranteed.
+    * If any aborted messages are received then the count must be at most one.
+
+    When 'blocked_by_both' is true then:
+    * The ingress edge router will reject and close the connection on the 
first message
+    * The second message may be aborted because the connection between the
+    edge router and the interior router was closed
+    * The remainder of the messages are going into a closed connection and
+    will receive no settlement.
+    """
+    def __init__(self, test_class, sender_host, receiver_host, test_address,
+                 message_size=100000, count=10, blocked_by_both=False, 
print_to_console=False):
+        """
+        Construct an instance of the unicast test
+        :param test_class:    test class - has wait-connection function
+        :param sender_host:   router for sender connection
+        :param receiver_host: router for receiver connection
+        :param test_address:  sender/receiver AMQP address
+        :param message_size:  in bytes
+        :param count:         how many messages to send
+        :param blocked_by_both:  true if edge router messages are also blocked 
by interior
+        :param print_to_console: print logs as they happen
+        """
+        super(OversizeMessageTransferTest, self).__init__()
+        self.test_class = test_class
+        self.sender_host = sender_host
+        self.receiver_host = receiver_host
+        self.test_address = test_address
+        self.msg_size = message_size
+        self.count = count
+        self.blocked_by_both = blocked_by_both
+        self.expect_block = True
+        self.messages = []
+
+        self.sender_conn = None
+        self.receiver_conn = None
+        self.error = None
+        self.sender = None
+        self.receiver = None
+        self.proxy = None
+
+        self.network_stable = False
+        self.n_sent = 0
+        self.n_rcvd = 0
+        self.n_accepted = 0
+        self.n_rejected = 0
+        self.n_modified = 0
+        self.n_released = 0
+        self.n_send_settled = 0
+        self.n_aborted = 0
+        self.n_connection_error = 0
+        self.shut_down = False
+
+        self.logger = Logger(title=("OversizeMessageTransferTest - %s" % 
(self.test_address)), print_to_console=print_to_console)
+        self.log_unhandled = False # verbose diagnostics of proton callbacks
+
+    def timeout(self):
+        current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d 
connection_error:%d send_settled:%d" %
+                   (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, 
self.n_connection_error, self.n_send_settled))
+        self.error = "Timeout Expired " + current
+        self.logger.log("self.timeout " + self.error)
+        self._shut_down_test()
+
+    def on_start(self, event):
+        self.logger.log("on_start")
+
+        self.logger.log("on_start: secheduling reactor timeout")
+        self.timer = event.reactor.schedule(10, Timeout(self))
+
+        self.logger.log("Waiting for router network to stabilize")
+        self.test_class.wait_router_network_connected()
+        self.network_stable = True
+
+        self.logger.log("on_start: generating messages")
+        for idx in range(self.count):
+            # construct message in indentifiable chunks
+            body_msg = ""
+            padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+            while len(body_msg) < self.msg_size:
+                chunk = "[%s:%d:%d" % (self.test_address, idx, len(body_msg))
+                padlen = 50 - len(chunk)
+                chunk += padchar * padlen
+                body_msg += chunk
+            if len(body_msg) > self.msg_size:
+                body_msg = body_msg[:self.msg_size]
+            m = Message(body=body_msg)
+            self.messages.append(m)
+
+        self.logger.log("on_start: opening receiver connection to %s" % 
(self.receiver_host.addresses[0]))
+        self.receiver_conn = 
event.container.connect(self.receiver_host.addresses[0])
+
+        self.logger.log("on_start: Creating receiver")
+        self.receiver = event.container.create_receiver(self.receiver_conn, 
self.test_address)
+
+        self.logger.log("on_start: opening   sender connection to %s" % 
(self.sender_host.addresses[0]))
+        self.sender_conn = 
event.container.connect(self.sender_host.addresses[0])
+
+        self.logger.log("on_start: Creating sender")
+        self.sender = event.container.create_sender(self.sender_conn, 
self.test_address)
+
+        self.logger.log("on_start: done")
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.count:
+            m = self.messages[self.n_sent]
+            self.logger.log("send. address:%s message:%d of %s length=%d" % (
+                            self.test_address, self.n_sent, self.count, 
self.msg_size))
+            self.sender.send(m)
+            self.n_sent += 1
+        #if self.n_sent == self.count:
+        #    self.log_unhandled = True
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            self.logger.log("on_sendable")
+            self.send()
+
+    def on_message(self, event):
+        self.logger.log("on_message: entry")
+        if self.expect_block:
+            # All messages should violate maxMessageSize.
+            # Receiving any is an error.
+            self.error = "Received a message. Expected to receive no messages."
+            self.logger.log(self.error)
+            self._shut_down_test()
+        else:
+            self.n_rcvd += 1
+            self.accept(event.delivery)
+            self._check_done()
+
+    def on_connection_remote_close(self, event):
+        if self.shut_down:
+            return
+        if event.connection == self.sender_conn:
+            if not event.connection.remote_condition is None:
+                if event.connection.remote_condition.name == 
OVERSIZE_CONDITION_NAME and \
+                   event.connection.remote_condition.description == 
OVERSIZE_CONDITION_DESC:
+                    self.logger.log("on_connection_remote_close: sender closed 
with correct condition")
+                    self.n_connection_error += 1
+                    self.sender_conn.close()
+                    self.sender_conn = None
+                else:
+                    # sender closed but for wrong reason
+                    self.error = "sender close error: Expected name: %s, 
description: %s, but received name: %s, description: %s" % (
+                                 OVERSIZE_CONDITION_NAME, 
OVERSIZE_CONDITION_DESC,
+                                 event.connection.remote_condition.name, 
event.connection.remote_condition.description)
+                    self.logger.log(self.error)
+            else:
+                self.error = "sender close error: Expected a remote_condition 
but there was none."
+                self.logger.log(self.error)
+        else:
+            # connection error but not for sender
+            self.error = "unexpected connection close error: wrong connection 
closed."
+            self.logger.log(self.error)
+        self._check_done()
+
+    def _shut_down_test(self):
+        self.shut_down = True
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        if self.sender:
+            self.sender.close()
+            self.sender = None
+        if self.receiver:
+            self.receiver.close()
+            self.receiver = None
+        if self.sender_conn:
+            self.sender_conn.close()
+            self.sender_conn = None
+        if self.receiver_conn:
+            self.receiver_conn.close()
+            self.receiver_conn = None
+
+    def _current(self):
+        return ("net_stable=%s sent=%d rcvd=%d rejected=%d aborted=%d 
connection_error:%d send_settled:%d" %
+                   (self.network_stable, self.n_sent, self.n_rcvd, 
self.n_rejected, self.n_aborted, self.n_connection_error, self.n_send_settled))
+
+    def _check_done(self):
+        self.logger.log("check_done: " + self._current())
+        if self.error is not None:
+            self.logger.log("TEST FAIL")
+            self._shut_down_test()
+        else:
+            if not self.blocked_by_both:
+                # Blocked by interior only. Connection to edge stays up
+                # and all messages must be accounted for.
+                done = self.n_rejected == 1 and \
+                       self.n_send_settled == self.count
+            else:
+                # Blocked by interior and edge. Expect edge connection to go 
down
+                # and some of our messaages arrive at edge after it has sent
+                # AMQP close. Those messages are never settled. TODO: Is that 
OK?
+                done = self.n_rejected == 1 and \
+                       self.n_connection_error == 1
+            if done:
+                self.logger.log("TEST DONE!!!")
+                # self.log_unhandled = True # verbose debugging
+                self._shut_down_test()
+
+    def on_rejected(self, event):
+        self.n_rejected += 1
+        if self.expect_block:
+            self.logger.log("on_rejected: entry")
+            self._check_done()
+        else:
+            self.error = "Unexpected on_reject"
+            self.logger.log(self.error)
+            self._check_done()
+
+    def on_aborted(self, event):
+        self.logger.log("on_aborted")
+        self.n_aborted += 1
+        self._check_done()
+
+    def on_settled(self, event):
+        self.logger.log("on_settled")
+        if event.connection == self.sender_conn:
+            self.logger.log("on_settled: sender connection")
+            self.n_send_settled += 1
+        self._check_done()
+
+    def on_error(self, event):
+        self.error = "Container error"
+        self.logger.log(self.error)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.timer.cancel()
+
+    def on_link_error(self, event):
+        self.error = event.link.remote_condition.name
+        self.logger.log("on_link_error: %s" % (self.error))
+        # Link errors may prevent normal test shutdown so don't even try.
+        raise Exception(self.error)
+
+    def on_reactor_final(self, event):
+        self.logger.log("on_reactor_final:")
+
+    def on_unhandled(self, method, *args):
+        if self.log_unhandled:
+            self.logger.log("on_unhandled: method: %s, args: %s" % (method, 
args))
+
+    def run(self):
+        try:
+            Container(self).run()
+        except Exception as e:
+            self.error = "Container run exception: %s" % (e)
+            self.logger.log(self.error)
+            self.logger.dump()
+        time.sleep(0.2)
+
+#
+# DISPATCH-975 Detect that an oversize message is blocked.
+# These tests check simple and compound blocking for multicast messages.
+#
+# Indexes into router arrays for receivers and receiver stats
+IDX_INTA = 0
+IDX_INTB = 1
+IDX_EA1 = 2
+IDX_EB1 = 3
+
+class OversizeMulticastTransferTest(MessagingHandler):
+    """
+    This test connects a sender and four receivers. Then it tries to send 
_count_
+    number of messages of the given size through the router or router network.
+    """
+    def __init__(self, test_class, sender_host, routers, test_address, 
expect_receives,
+                 blocked_by_ingress, blocked_by_interior,
+                 message_size=100000, count=10, print_to_console=False):
+        """
+        Construct an instance of the multicast test
+        :param test_class:    test class - has wait-connection function
+        :param sender_host:         router for the sender connection
+        :param routers:             a list of all the routers for receiver 
connections
+        :param test_address:        sender/receiver AMQP address
+        :param expect_receives:     array of expected receive counts
+        :param blocked_by_ingress:  true if ingress router blocks
+        :param blocked_by_interior: true if edge router messages also blocked 
by interior
+        :param message_size:        in bytes
+        :param count:               how many messages to send
+        :param print_to_console:    print logs as they happen
+        """
+        super(OversizeMulticastTransferTest, self).__init__()
+        self.test_class = test_class
+        self.sender_host = sender_host
+        self.routers = routers
+        self.test_address = test_address
+        self.msg_size = message_size
+        self.count = count
+        self.expect_receives = expect_receives # router array
+        self.blocked_by_ingress = blocked_by_ingress
+        self.blocked_by_interior = blocked_by_interior
+        self.messages = []
+
+        self.sender_conn = None
+        self.receiver_conns = [None, None, None, None] # router array
+        self.error = None
+        self.sender = None
+        self.receivers = [None, None, None, None] # router array
+        self.proxy = None
+
+        self.network_stable = False
+        self.n_sent = 0
+        self.n_rcvds = [0, 0, 0, 0] # router array
+        self.n_accepted = 0
+        self.n_rejected = 0
+        self.n_modified = 0
+        self.n_released = 0
+        self.n_send_settled = 0
+        self.n_aborteds = [0, 0, 0, 0] # router array
+        self.n_connection_error = 0
+        self.shut_down = False
+
+        self.logger = Logger(title=("OversizeMulticastTransferTest - %s" % 
(self.test_address)), print_to_console=print_to_console)
+        self.log_unhandled = False # verbose diagnostics of proton callbacks
+
+    def timeout(self):
+        current = self._current()
+        self.error = "Timeout Expired " + current
+        self.logger.log("self.timeout " + self.error)
+        self._shut_down_test()
+
+    def on_start(self, event):
+        self.logger.log("on_start")
+
+        self.logger.log("on_start: secheduling reactor timeout")
+        self.timer = event.reactor.schedule(10, Timeout(self))
+
+        self.logger.log("Waiting for router network to stabilize")
+        self.test_class.wait_router_network_connected()
+        self.network_stable = True
+
+        for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+            self.logger.log("on_start: opening receiver connection to %s" % 
(self.routers[idx].addresses[0]))
+            self.receiver_conns[idx] = 
event.container.connect(self.routers[idx].addresses[0])
+        for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+            self.logger.log("on_start: Creating receiver %d" % idx)
+            self.receivers[idx] = 
event.container.create_receiver(self.receiver_conns[idx], self.test_address)
+
+        self.logger.log("on_start: generating messages")
+        for idx in range(self.count):
+            # construct message in indentifiable chunks
+            body_msg = ""
+            padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
+            while len(body_msg) < self.msg_size:
+                chunk = "[%s:%d:%d" % (self.test_address, idx, len(body_msg))
+                padlen = 50 - len(chunk)
+                chunk += padchar * padlen
+                body_msg += chunk
+            if len(body_msg) > self.msg_size:
+                body_msg = body_msg[:self.msg_size]
+            m = Message(body=body_msg)
+            self.messages.append(m)
+
+        self.logger.log("on_start: opening   sender connection to %s" % 
(self.sender_host.addresses[0]))
+        self.sender_conn = 
event.container.connect(self.sender_host.addresses[0])
+
+        self.logger.log("on_start: Creating sender")
+        self.sender = event.container.create_sender(self.sender_conn, 
self.test_address)
+
+        self.logger.log("on_start: done")
+
+    def rcvr_idx_of(self, rcvr):
+        """
+        Given a receiver, as in event.receiver, return
+        the router array index of that receiver's router
+        :param rcvr:
+        :return: integer index of receiver
+        """
+        for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+            if rcvr == self.receivers[idx]:
+                return idx
+        self.error = "Receiver not found in receivers array."
+        self.logger.log(self.error)
+        self.logger.dump()
+        self._shut_down_test()
+        raise Exception(self.error)
+
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.count:
+            m = self.messages[self.n_sent]
+            self.logger.log("send. address:%s message:%d of %s length=%d" % (
+                            self.test_address, self.n_sent, self.count, 
self.msg_size))
+            self.sender.send(m)
+            self.n_sent += 1
+        #if self.n_sent == self.count:
+        #    self.log_unhandled = True
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            self.logger.log("on_sendable")
+            self.send()
+
+    def on_message(self, event):
+        self.logger.log("on_message")
+        if self.shut_down:
+            return
+        idx = self.rcvr_idx_of(event.receiver)
+        if self.expect_receives[idx] == 0:
+            # Receiving any is an error.
+            self.error = "Received a message. Expected to receive no messages."
+            self.logger.log(self.error)
+            self._shut_down_test()
+        else:
+            self.n_rcvds[idx] += 1
+            self.accept(event.delivery)
+            self._check_done()
+
+    def on_connection_remote_close(self, event):
+        if self.shut_down:
+            return
+        if event.connection == self.sender_conn:
+            if not event.connection.remote_condition is None:
+                if event.connection.remote_condition.name == 
OVERSIZE_CONDITION_NAME and \
+                   event.connection.remote_condition.description == 
OVERSIZE_CONDITION_DESC:
+                    self.logger.log("on_connection_remote_close: sender closed 
with correct condition")
+                    self.n_connection_error += 1
+                    self.sender_conn.close()
+                    self.sender_conn = None
+                else:
+                    # sender closed but for wrong reason
+                    self.error = "sender close error: Expected name: %s, 
description: %s, but received name: %s, description: %s" % (
+                                 OVERSIZE_CONDITION_NAME, 
OVERSIZE_CONDITION_DESC,
+                                 event.connection.remote_condition.name, 
event.connection.remote_condition.description)
+                    self.logger.log(self.error)
+            else:
+                self.error = "sender close error: Expected a remote_condition 
but there was none."
+                self.logger.log(self.error)
+        else:
+            # connection error but not for sender
+            self.error = "unexpected connection close error: wrong connection 
closed."
+            self.logger.log(self.error)
+        self._check_done()
+
+    def _shut_down_test(self):
+        self.shut_down = True
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        if self.sender:
+            self.sender.close()
+            self.sender = None
+        for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+            if self.receivers[idx]:
+                self.receivers[idx].close()
+                self.receivers[idx] = None
+        if self.sender_conn:
+            self.sender_conn.close()
+            self.sender_conn = None
+        for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+            if self.receiver_conns[idx]:
+                self.receiver_conns[idx].close()
+                self.receiver_conns[idx] = None
+
+    def _current(self):
+        return ("net_stable:%s sent=%d rcvd=%s rejected=%d aborted=%s 
connection_error:%d send_settled:%d" %
+               (self.network_stable, self.n_sent, str(self.n_rcvds), 
self.n_rejected, str(self.n_aborteds), self.n_connection_error, 
self.n_send_settled))
+
+    def _check_done(self):
+        self.logger.log("check_done: " + self._current())
+        if self.error is not None:
+            self.logger.log("TEST FAIL")
+            self._shut_down_test()
+        else:
+            if self.blocked_by_interior:
+                if self.blocked_by_ingress:
+                    # Blocked by interior and edge. Expect edge connection to 
go down
+                    # and some of our messaages arrive at edge after it has 
sent
+                    # AMQP close. Those messages are never settled. TODO: Is 
that OK?
+                    done = self.n_rejected == 1 and \
+                           self.n_connection_error == 1
+                else:
+                    # Blocked by interior only. Connection to edge stays up
+                    # and all messages must be accounted for.
+                    all_received = True
+                    for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]:
+                        if self.expect_receives[idx] > 0:
+                            if not self.n_rcvds[idx] == 
self.expect_receives[idx]:
+                                all_received = False
+                    done = self.n_rejected <= 1 and \
+                           self.n_send_settled == self.count and \
+                           all_received
+            else:
+                # Blocked by edge should never deliver to interior
+                done = self.n_rejected == 1 and \
+                       self.n_connection_error == 1
+
+            if done:
+                self.logger.log("TEST DONE!!!")
+                # self.log_unhandled = True # verbose debugging
+                self._shut_down_test()
+
+    def on_rejected(self, event):
+        self.n_rejected += 1
+        if self.reject:
+            self.logger.log("on_rejected: entry")
+            self._check_done()
+        else:
+            self.error = "Unexpected on_reject"
+            self.logger.log(self.error)
+            self._check_done()
+
+    def on_aborted(self, event):
+        self.logger.log("on_aborted")
+        if self.shut_down:
+            return
+        idx = self.rcvr_idx_of(event.receiver)
+        self.n_aborteds[idx] += 1
+        self._check_done()
+
+    def on_settled(self, event):
+        self.logger.log("on_settled")
+        if event.connection == self.sender_conn:
+            self.logger.log("on_settled: sender connection")
+            self.n_send_settled += 1
+        self._check_done()
+
+    def on_error(self, event):
+        self.error = "Container error"
+        self.logger.log(self.error)
+        self._shut_down_test()
+
+    def on_link_error(self, event):
+        self.error = event.link.remote_condition.name
+        self.logger.log("on_link_error: %s" % (self.error))
+        # Link errors may prevent normal test shutdown so don't even try.
+        raise Exception(self.error)
+
+    def on_reactor_final(self, event):
+        self.logger.log("on_reactor_final:")
+
+    def on_unhandled(self, method, *args):
+        if self.log_unhandled:
+            self.logger.log("on_unhandled: method: %s, args: %s" % (method, 
args))
+
+    def run(self):
+        try:
+            Container(self).run()
+        except Exception as e:
+            self.error = "Container run exception: %s" % (e)
+            self.logger.log(self.error)
+            self.logger.dump()
+        time.sleep(0.2)
+
+
+class MaxMessageSizeBlockOversize(TestCase):
+    """
+    verify that maxMessageSize blocks oversize messages
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Start the router"""
+        super(MaxMessageSizeBlockOversize, cls).setUpClass()
+
+        def router(name, mode, max_size, extra):
+            config = [
+                ('router', {'mode': mode,
+                            'id': name,
+                            'allowUnsettledMulticast': 'yes',
+                            'workerThreads': W_THREADS}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'}),
+                ('policy', {'maxConnections': 100, 'enableVhostPolicy': 
'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'}),
+                ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true',
+                    'groups': [(
+                        '$default', {
+                            'users': '*',
+                            'maxConnections': 100,
+                            'remoteHosts': '*',
+                            'sources': '*',
+                            'targets': '*',
+                            'allowAnonymousSender': 'true',
+                            'allowWaypointLinks': 'true',
+                            'allowDynamicSource': 'true'
+                        }
+                    )]}
+                )
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            return cls.routers[-1]
+
+        # configuration:
+        # two edge routers connected via 2 interior routers with max sizes
+        #
+        #  +-------+    +---------+    +---------+    +-------+
+        #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+        #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+        #  +-------+    +---------+    +---------+    +-------+
+        #
+        # Note:
+        #  * Messages whose senders connect to INT.A or INT.B are subject to 
max message size
+        #    defined for the ingress router only.
+        #  * Message whose senders connect to EA1 or EA2 are subject to max 
message size
+        #    defined for the ingress router. If the message is forwarded 
through the
+        #    connected interior router then the message is subject to another 
max message size
+        #    defined by the interior router.
+
+        cls.routers = []
+
+        interrouter_port = cls.tester.get_port()
+        cls.INTA_edge_port   = cls.tester.get_port()
+        cls.INTB_edge_port   = cls.tester.get_port()
+
+        router('INT.A', 'interior', INTA_MAX_SIZE,
+               [('listener', {'role': 'inter-router',
+                              'port': interrouter_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+        router('INT.B', 'interior', INTB_MAX_SIZE,
+               [('connector', {'name': 'connectorToA',
+                               'role': 'inter-router',
+                               'port': interrouter_port}),
+                ('listener', {'role': 'edge',
+                              'port': cls.INTB_edge_port})])
+        cls.INT_B = cls.routers[1]
+        cls.INT_B.listener = cls.INT_B.addresses[0]
+
+        router('EA1', 'edge', EA1_MAX_SIZE,
+               [('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()}),
+                ('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge_port})])
+        cls.EA1 = cls.routers[2]
+        cls.EA1.listener = cls.EA1.addresses[0]
+
+        router('EB1', 'edge', EB1_MAX_SIZE,
+               [('connector', {'name': 'uplink',
+                               'role': 'edge',
+                               'port': cls.INTB_edge_port,
+                               'maxFrameSize': 1024}),
+                ('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()})])
+        cls.EB1 = cls.routers[3]
+        cls.EB1.listener = cls.EB1.addresses[0]
+
+        cls.wait_router_network_connected()
+
+    @classmethod
+    def wait_router_network_connected(cls):
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+        cls.EA1.wait_connectors()
+        cls.EB1.wait_connectors()
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
+        p = self.popen(
+            ['qdmanage'] +
+            cmd.split(' ') +
+            ['--bus',
+             address or self.address(),
+             '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def sense_n_closed_lines(self, routername):
+        """
+        Read a router's log file and count how many size-exceeded lines are in 
it.
+        :param routername:
+        :return: (int, int) tuple with counts of lines in and lines out
+        """
+        with  open("../setUpClass/%s.log" % routername, 'r') as router_log:
+            log_lines = router_log.read().split("\n")
+        i_closed_lines = [s for s in log_lines if OVERSIZE_CONDITION_DESC in s 
and "<-" in s]
+        o_closed_lines = [s for s in log_lines if OVERSIZE_CONDITION_DESC in s 
and "->" in s]
+        return (len(i_closed_lines), len(o_closed_lines))
+
+
+    # verify that a message can go through an edge EB1 and get blocked by 
interior INT.B
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #                                    |             ^
+    #                                    V             |
+    #                               +--------+    +-------+
+    #                               |receiver|    |sender |
+    #                               |        |    |199,800|
+    #                               +--------+    +-------+
+    #
+    def test_60_block_oversize_EB1_INTB_at_INTB(self):
+        ibefore, obefore = self.sense_n_closed_lines("EB1")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e60",
+                                           message_size=EB1_MAX_SIZE - 
OVER_UNDER,
+                                           print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_60 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        iafter, oafter = self.sense_n_closed_lines("EB1")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 0 and idelta == 1
+        if (not success):
+            test.logger.log("FAIL: N closed events in log file did not 
increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                            (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+    # verify that a message can go through an edge EB1 and get blocked by 
interior INT.B
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #      |                                           ^
+    #      V                                           |
+    #   +--------+                                +-------+
+    #   |receiver|                                |sender |
+    #   |        |                                |199,800|
+    #   +--------+                                +-------+
+    #
+    def test_61_block_oversize_EB1_EA1_at_INTB(self):
+        ibefore, obefore = self.sense_n_closed_lines("EB1")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.EA1,
+                                           "e61",
+                                           message_size=EB1_MAX_SIZE - 
OVER_UNDER,
+                                           print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_61 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        iafter, oafter = self.sense_n_closed_lines("EB1")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 0 and idelta == 1
+        if (not success):
+            test.logger.log("FAIL: N closed events in log file did not 
increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                            (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+    # see what happens when a message must be blocked by edge and also by 
interior
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #                                    |             ^
+    #                                    V             |
+    #                               +--------+    +-------+
+    #                               |receiver|    |sender |
+    #                               |        |    |200,200|
+    #                               +--------+    +-------+
+    #
+    def test_70_block_oversize_EB1_INTB_at_both(self):
+        ibefore, obefore = self.sense_n_closed_lines("EB1")
+        test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize,
+                                           MaxMessageSizeBlockOversize.EB1,
+                                           MaxMessageSizeBlockOversize.INT_B,
+                                           "e70",
+                                           message_size=EB1_MAX_SIZE + 
OVER_UNDER,
+                                           blocked_by_both=True,
+                                           print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_70 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        # EB1 must close connection to sender (odelta == 1) but
+        # INT.B may or may not close the edge-interior link. Sometimes EB1 
senses the
+        # oversize condition before it has forwarded too many bytes of the 
first message
+        # to INT.B. Then EB1 aborts the first message to INT.B and INT.B never
+        # detects an oversize condition.
+        iafter, oafter = self.sense_n_closed_lines("EB1")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 1 and (idelta == 0 or idelta == 1)
+        if (not success):
+            test.logger.log("FAIL: N closed events in log file did not 
increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                            (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+    # Verify that a multicast can go through an edge EB1 and get blocked by 
interior INT.B
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  | rcvr  |    |  rcvr   |    |  rcvr   |    | rcvr  |
+    #  |  no   |    |   no    |    |   no    |    | yes   |
+    #  +-------+    +---------+    +---------+    +-------+
+    #      ^            ^              ^             ^
+    #      |            |              |             |
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #                                                ^
+    #                                                |
+    #                                             +-------+
+    #                                             |sender |
+    #                                             |199,800|
+    #                                             +-------+
+    #
+    def test_80_block_multicast_EB1_INTB_at_INTB(self):
+        ibefore, obefore = self.sense_n_closed_lines("EB1")
+        count = 10
+        test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize,
+                                             MaxMessageSizeBlockOversize.EB1,
+                                             
MaxMessageSizeBlockOversize.routers,
+                                             "multicast/e80",
+                                             [0, 0, 0, count],
+                                             blocked_by_ingress = False,
+                                             blocked_by_interior = True,
+                                             message_size=EB1_MAX_SIZE - 
OVER_UNDER,
+                                             count = count,
+                                             print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_80 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        iafter, oafter = self.sense_n_closed_lines("EB1")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 0 and idelta == 1
+        if (not success):
+            test.logger.log("FAIL: N closed events in log file did not 
increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                            (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+    # Verify that a multicast blocked by edge ingress goes to no receivers
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  | rcvr  |    |  rcvr   |    |  rcvr   |    | rcvr  |
+    #  |  no   |    |   no    |    |   no    |    | no   |
+    #  +-------+    +---------+    +---------+    +-------+
+    #      ^            ^              ^             ^
+    #      |            |              |             |
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #      ^
+    #      |
+    #  +-------+
+    #  |sender |
+    #  | 50,200|
+    #  +-------+
+    #
+    def test_81_block_multicast_EA1(self):
+        ibefore, obefore = self.sense_n_closed_lines("EA1")
+        count = 10
+        test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize,
+                                             MaxMessageSizeBlockOversize.EA1,
+                                             
MaxMessageSizeBlockOversize.routers,
+                                             "multicast/e81",
+                                             [0, 0, 0, 0],
+                                             blocked_by_ingress = True,
+                                             blocked_by_interior = False,
+                                             message_size=EA1_MAX_SIZE + 
OVER_UNDER,
+                                             count = count,
+                                             print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_81 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        iafter, oafter = self.sense_n_closed_lines("EA1")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 1 and idelta == 0
+        if (not success):
+            test.logger.log("FAIL: N closed events in log file did not 
increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                            (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+    # Verify that a multicast blocked by interior ingress goes to no receivers
+    #
+    #  +-------+    +---------+    +---------+    +-------+
+    #  | rcvr  |    |  rcvr   |    |  rcvr   |    | rcvr  |
+    #  |  no   |    |   no    |    |   no    |    | no    |
+    #  +-------+    +---------+    +---------+    +-------+
+    #      ^            ^              ^             ^
+    #      |            |              |             |
+    #  +-------+    +---------+    +---------+    +-------+
+    #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+    #  | 50,000|    | 100,000 |    | 150,000 |    |200,000|
+    #  +-------+    +---------+    +---------+    +-------+
+    #                   ^
+    #                   |
+    #               +-------+
+    #               |sender |
+    #               |100,200|
+    #               +-------+
+    #
+    def test_82_block_multicast_INTA(self):
+        ibefore, obefore = self.sense_n_closed_lines("INT.A")
+        count = 10
+        test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize,
+                                             MaxMessageSizeBlockOversize.INT_A,
+                                             
MaxMessageSizeBlockOversize.routers,
+                                             "multicast/e82",
+                                             [0, 0, 0, 0],
+                                             blocked_by_ingress=True,
+                                             blocked_by_interior=False,
+                                             message_size=INTA_MAX_SIZE + 
OVER_UNDER,
+                                             count=count,
+                                             print_to_console=False)
+        test.run()
+        if test.error is not None:
+            test.logger.log("test_82 test error: %s" % (test.error))
+            test.logger.dump()
+        self.assertTrue(test.error is None)
+
+        # Verify that interrouter link was shut down
+        iafter, oafter = self.sense_n_closed_lines("INT.A")
+        idelta = iafter - ibefore
+        odelta = oafter - obefore
+        success = odelta == 1 and idelta == 0
+        if (not success):
+            test.logger.log(
+                "FAIL: N closed events in log file did not increment by 1. 
oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" %
+                (obefore, oafter, ibefore, iafter))
+            test.logger.dump()
+            self.assertTrue(success), "Expected router to generate close with 
condition: message size exceeded"
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to