This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new a754e18 DISPATCH-975: Enforce max message size on message ingress a754e18 is described below commit a754e1897ece244c97b1d76163e7b96902316627 Author: Chuck Rolke <c...@apache.org> AuthorDate: Wed Apr 15 10:33:25 2020 -0400 DISPATCH-975: Enforce max message size on message ingress MaxMessageSize may be specified globally, per vhost, or per vhost user group. The global setting applies to all vhosts for which maxMessageSize is unspecified. The vhost setting applies to all vhost user groups for which maxMessageSize is unspecified. The vhost user group setting overrides all other settings. A maxMessageSize setting of zero disables maxMessageSize enforcement. Links over which maxMessageSize is being enforced will advertise the size in the max-message-size field of the Attach frame. Qpid-dispatch ignores the max-message-size field received in incoming Attach frames. Message size for maxMessageSize purposes is calculated to be the number of AMQP octets in the Annotated Message. This includes the header, delivery-annotations, message-annotations, properties, application-properties, application-data, and footer sections. Administrators and users must be aware that a "message" consisting a single character string (the application-data) will be much larger over the wire after properties and annotations have been inserted. Max message size is enforced on message/transfer ingress only. Interior routers form a trusted domain where messages that have entered by one interior router are allowed to any destination in the system. Edge routers are not trusted; messages from an edge router must be checked again by the attached interior router before they are forwarded to the trusted network. Messages from an interior router to an edge router is not checked again by the edge router. * Transfer size is checked as a message enters an interior router. A message that passes an interior router size check is free to go to any destination on that router or any other interior or edge router. * Transfers size is checked as a message enters an edge router. A message that passes an edge router size check is free to go to any destination on that edge router. If the message is uplinked to an interior router then that interior router may apply another max message size check possibly using a different maximum message size setting. When a message exceeds max size then: * Disposition of rejected is returned to the sender for that delivery. * Copies of the message being delivered through the router network are aborted. * The connection to the sender is closed with: condition: "amqp:connection:forced" description: "Message size exceeded" Self test includes a four-router linear network with two interior and two edge routers. Tests try oversize and undersize messages with a variety of sender and receiver attachment points in that network. The test routers are configured with varying max message sizes. TODO: * No tests yet for link route. * No doc book pages describing feature configuration and behavior. --- include/qpid/dispatch/container.h | 2 + include/qpid/dispatch/message.h | 6 + include/qpid/dispatch/server.h | 2 + python/qpid_dispatch/management/qdrouter.json | 23 +- python/qpid_dispatch_internal/management/config.py | 2 + .../qpid_dispatch_internal/policy/policy_local.py | 37 +- .../policy/policy_manager.py | 8 + src/container.c | 7 +- src/message.c | 41 +- src/message_private.h | 3 + src/policy.c | 48 +- src/policy.h | 16 + src/router_core/delivery.c | 6 + src/router_core/delivery.h | 2 + src/router_core/transfer.c | 2 +- src/router_node.c | 114 ++- src/server.c | 4 + tests/CMakeLists.txt | 2 + tests/system_tests_policy_oversize_basic.py | 792 +++++++++++++++ tests/system_tests_policy_oversize_compound.py | 1058 ++++++++++++++++++++ 20 files changed, 2111 insertions(+), 64 deletions(-) diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h index 99eb3bd..054c6d5 100644 --- a/include/qpid/dispatch/container.h +++ b/include/qpid/dispatch/container.h @@ -238,6 +238,8 @@ void qd_session_free(qd_session_t *qd_ssn); bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn); qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn); +void qd_connection_log_policy_denial(qd_link_t *link, const char *text); + // handy macros to get around PROTON-2184: pn_session_set_context aborts if // context==0 (can remove this once qdrouter requires >= proton 0.31.x) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 9ed2bc2..afc39c1 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -432,6 +432,12 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted); */ uint8_t qd_message_get_priority(qd_message_t *msg); +/** + * True if message is larger that maxMessageSize + * @param msg A pointer to the message + * @return + */ +bool qd_message_oversize(const qd_message_t *msg); ///@} diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 043baa5..abe5a9a 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -604,6 +604,8 @@ bool qd_connection_strip_annotations_in(const qd_connection_t *c); void qd_connection_wake(qd_connection_t *ctx); +int qd_connection_max_message_size(const qd_connection_t *c); + /** * @} */ diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 9254b7d..ec678d0 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1869,6 +1869,13 @@ "required": false, "create": true }, + "maxMessageSize": { + "type": "integer", + "default": 0, + "description": "The maximum size in bytes of AMQP message transfers allowed for this router. This limit is applied only to transfers over user connections and is not applied to interrouter or edge router connections. This limit may be overridden by vhost or by vhost user group settings. A value of zero disables this limit.", + "required": false, + "create": true + }, "enableVhostPolicy": { "type": "boolean", "default": false, @@ -1905,10 +1912,11 @@ "graph": true, "description": "The sum of all vhost sender and receiver denials." }, + "maxMessageSizeDenied": {"type": "integer", "graph": true}, "totalDenials": { "type": "integer", "graph": true, - "description": "The total number of connection and link denials." + "description": "The total number of connection, link, and transfer denials." } } }, @@ -1933,6 +1941,11 @@ "create": true, "update": true }, + "maxMessageSize": { + "type": "integer", + "description": "Optional maximum size in bytes of AMQP message transfers allowed for connections to this vhost. This limit overrides the policy maxMessageSize value and may be overridden by vhost user group settings. A value of zero disables this limit.", + "required": false + }, "maxConnectionsPerUser": { "type": "integer", "default": 65535, @@ -1993,6 +2006,11 @@ "required": false, "create": false }, + "maxMessageSize": { + "type": "integer", + "description": "Optional maximum size in bytes of AMQP message transfers allowed for connections created by users in this group. This limit overrides the policy and vhost maxMessageSize values. A value of zero disables this limit.", + "required": false + }, "maxFrameSize": { "type": "integer", "description": "The largest frame, in bytes, that may be sent on this connection. Non-zero policy values overwrite values specified for a listener object (AMQP Open, max-frame-size).", @@ -2130,7 +2148,8 @@ "sessionDenied": {"type": "integer", "graph": true}, "senderDenied": {"type": "integer", "graph": true}, - "receiverDenied": {"type": "integer", "graph": true} + "receiverDenied": {"type": "integer", "graph": true}, + "maxMessageSizeDenied": {"type": "integer", "graph": true} } }, diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py index 8cf1940..bd1595f 100644 --- a/python/qpid_dispatch_internal/management/config.py +++ b/python/qpid_dispatch_internal/management/config.py @@ -201,10 +201,12 @@ def configure_dispatch(dispatch, lib_handle, filename): policyDir = config.by_type('policy')[0]['policyDir'] policyDefaultVhost = config.by_type('policy')[0]['defaultVhost'] useHostnamePatterns = config.by_type('policy')[0]['enableVhostNamePatterns'] + maxMessageSize = config.by_type('policy')[0]['maxMessageSize'] for a in config.by_type("policy"): configure(a) agent.policy.set_default_vhost(policyDefaultVhost) agent.policy.set_use_hostname_patterns(useHostnamePatterns) + agent.policy.set_max_message_size(maxMessageSize) # Remaining configuration for t in "sslProfile", "authServicePlugin", "listener", "connector", \ diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index 37733f1..cfe3400 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -120,7 +120,7 @@ class PolicyCompiler(object): """ Validate incoming configuration for legal schema. - Warn about section options that go unused. - - Disallow negative max connection numbers. + - Disallow negative max connection/message size numbers. - Check that connectionOrigins resolve to IP hosts. - Enforce internal consistency, """ @@ -131,6 +131,7 @@ class PolicyCompiler(object): PolicyKeys.KW_IGNORED_TYPE, PolicyKeys.KW_VHOST_NAME, PolicyKeys.KW_MAXCONN, + PolicyKeys.KW_MAX_MESSAGE_SIZE, PolicyKeys.KW_MAXCONNPERHOST, PolicyKeys.KW_MAXCONNPERUSER, PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT, @@ -232,7 +233,7 @@ class PolicyCompiler(object): def compile_app_settings(self, vhostname, usergroup, policy_in, policy_out, warnings, errors): """ - Compile a schema from processed json format to local internal format. + Compile a vhostUserGroupSettings schema from processed json format to local internal format. @param[in] name vhost name @param[in] policy_in user config settings @param[out] policy_out validated Internal format @@ -248,7 +249,7 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_REMOTE_HOSTS] = '' # DISPATCH-1277 - KW_MAX_FRAME_SIZE must be defaulted to 16384 not 2147483647 policy_out[PolicyKeys.KW_MAX_FRAME_SIZE] = 16384 - policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = 0 + policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None policy_out[PolicyKeys.KW_MAX_SESSION_WINDOW] = 2147483647 policy_out[PolicyKeys.KW_MAX_SESSIONS] = 65536 policy_out[PolicyKeys.KW_MAX_SENDERS] = 2147483647 @@ -411,7 +412,7 @@ class PolicyCompiler(object): def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors): """ - Compile a schema from processed json format to local internal format. + Compile a vhost schema from processed json format to local internal format. @param[in] name vhost name @param[in] policy_in raw policy to be validated @param[out] policy_out validated Internal format @@ -429,6 +430,7 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535 policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False policy_out[PolicyKeys.KW_GROUPS] = {} + policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None # validate the options for key, val in dict_iteritems(policy_in): @@ -445,6 +447,14 @@ class PolicyCompiler(object): errors.append(msg) return False policy_out[key] = val + elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE + ]: + if not self.validateNumber(val, 0, 0, cerror): + msg = ("Policy vhost '%s' option '%s' has error '%s'." % + (name, key, cerror[0])) + errors.append(msg) + return False + policy_out[key] = val elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]: if not type(val) is bool: errors.append("Policy vhost '%s' option '%s' must be of type 'bool' but is '%s'" % @@ -609,6 +619,9 @@ class PolicyLocal(object): # When true policy ruleset definitions are propagated to C code self.use_hostname_patterns = False + # _max_message_size + # holds global value from policy config object + self._max_message_size = 0 # # Service interfaces # @@ -830,6 +843,13 @@ class PolicyLocal(object): upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname]) + maxsize = upolicy.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None) + if maxsize is None: + maxsize = ruleset.get(PolicyKeys.KW_MAX_MESSAGE_SIZE, None) + if maxsize is None: + maxsize = self._max_message_size + upolicy[PolicyKeys.KW_MAX_MESSAGE_SIZE] = maxsize + upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats() return True except Exception as e: @@ -851,6 +871,15 @@ class PolicyLocal(object): self._manager.log_trace( "Policy internal error closing connection id %s. %s" % (conn_id, str(e))) + def set_max_message_size(self, size): + """ + record max message size from policy config object + :param size: + :return:ls + + """ + self._max_message_size = size + # # def test_load_config(self): diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py index fe5e4e8..f7f35fb 100644 --- a/python/qpid_dispatch_internal/policy/policy_manager.py +++ b/python/qpid_dispatch_internal/policy/policy_manager.py @@ -161,6 +161,14 @@ class PolicyManager(object): @return: none """ self._policy_local.close_connection(conn_id) + + def set_max_message_size(self, size): + """ + Policy has set global maxMessageSize. + :param size: + :return: none + """ + self._policy_local.set_max_message_size(size) # # # diff --git a/src/container.c b/src/container.c index d08800b..a728de7 100644 --- a/src/container.c +++ b/src/container.c @@ -159,7 +159,7 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) } -static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) +static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, int max_size) { qd_node_t *node = container->default_node; @@ -191,6 +191,9 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) link->node = node; link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link); + if (max_size) { + pn_link_set_max_message_size(pn_link, (uint64_t)max_size); + } pn_link_set_context(pn_link, link); node->ntype->incoming_handler(node->context, link); } @@ -652,7 +655,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } qd_conn->n_senders++; } - setup_incoming_link(container, pn_link); + setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn)); } } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE) handle_link_open(container, pn_link); diff --git a/src/message.c b/src/message.c index a3420a1..0d7b08b 100644 --- a/src/message.c +++ b/src/message.c @@ -29,6 +29,7 @@ #include "compose_private.h" #include "connection_manager_private.h" #include "aprintf.h" +#include "policy.h" #include <string.h> #include <ctype.h> #include <stdio.h> @@ -941,7 +942,6 @@ qd_message_t *qd_message() msg->content->lock = sys_mutex(); sys_atomic_init(&msg->content->ref_count, 1); msg->content->parse_depth = QD_DEPTH_NONE; - return (qd_message_t*) msg; } @@ -1244,14 +1244,13 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent) /** * Receive and discard large messages for which there is no destination. * Don't waste resources by putting the message into internal buffers. - * Don't fiddle with locking as no sender is competing with reception. + * Message locking is not required since the message content buffers are untouched. */ qd_message_t *discard_receive(pn_delivery_t *delivery, pn_link_t *link, qd_message_t *msg_in) { qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in; - while (1) { #define DISCARD_BUFFER_SIZE (128 * 1024) char dummy[DISCARD_BUFFER_SIZE]; @@ -1261,13 +1260,17 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, // have read all available pn_link incoming bytes break; } else if (rc == PN_EOS || rc < 0) { - // end of message or error. Call the message complete - msg->content->receive_complete = true; + // End of message or error: finalize message_receive handling msg->content->aborted = pn_delivery_aborted(delivery); qd_nullify_safe_ptr(&msg->content->input_link_sp); - pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); + if (msg->content->oversize) { + // Aborting the content disposes of downstream copies. + // This has no effect on the received message. + msg->content->aborted = true; + } + msg->content->receive_complete = true; break; } else { // rc was > 0. bytes were read and discarded. @@ -1308,11 +1311,13 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc); pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF); pn_record_set(record, PN_DELIVERY_CTX, (void*) msg); + msg->content->max_message_size = qd_connection_max_message_size(qdc); } // // The discard flag indicates we should keep reading the input stream // but not process the message for delivery. + // Oversize messages are also discarded. // if (msg->content->discard) { return discard_receive(delivery, link, (qd_message_t *)msg); @@ -1416,10 +1421,24 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) recv_error = true; } else if (rc > 0) { // - // We have received a positive number of bytes for the message. Advance - // the cursor in the buffer. + // We have received a positive number of bytes for the message. + // Advance the cursor in the buffer. // qd_buffer_insert(content->pending, rc); + + // Handle maxMessageSize violations + if (content->max_message_size) { + content->bytes_received += rc; + if (content->bytes_received > content->max_message_size) + { + qd_connection_t *conn = qd_link_connection(qdl); + qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded"); + qd_policy_count_max_size_event(link, conn); + content->discard = true; + content->oversize = true; + return discard_receive(delivery, link, (qd_message_t*)msg); + } + } } else { // // We received zero bytes, and no PN_EOS. This means that we've received @@ -2231,3 +2250,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted) qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; msg_pvt->content->aborted = aborted; } + +bool qd_message_oversize(const qd_message_t *msg) +{ + qd_message_content_t * mc = MSG_CONTENT(msg); + return mc->oversize; +} diff --git a/src/message_private.h b/src/message_private.h index 7bc66dc..ba13538 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -109,6 +109,8 @@ typedef struct { qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; + int max_message_size; // configured max; 0 if no max to enforce + int bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size uint32_t fanout; // The number of receivers for this message, including in-process subscribers. qd_link_t_sp input_link_sp; // message received on this link @@ -120,6 +122,7 @@ typedef struct { bool disable_q2_holdoff; // Disable the Q2 flow control bool priority_parsed; bool priority_present; + bool oversize; // policy oversize handling in effect uint8_t priority; // The priority of this message } qd_message_content_t; diff --git a/src/policy.c b/src/policy.c index 896540c..4a45bed 100644 --- a/src/policy.c +++ b/src/policy.c @@ -46,6 +46,7 @@ static uint64_t n_connections = 0; static uint64_t n_denied = 0; static uint64_t n_processed = 0; static uint64_t n_links_denied = 0; +static uint64_t n_maxsize_messages_denied = 0; static uint64_t n_total_denials = 0; // @@ -83,6 +84,9 @@ static PyObject * module = 0; ALLOC_DEFINE(qd_policy_settings_t); +// Policy log module used outside of policy proper +qd_log_source_t* policy_log_source = 0; + // // Policy configuration/statistics management interface // @@ -116,6 +120,7 @@ qd_policy_t *qd_policy(qd_dispatch_t *qd) policy->tree_lock = sys_mutex(); policy->hostname_tree = qd_parse_tree_new(QD_PARSE_TREE_ADDRESS); stats_lock = sys_mutex(); + policy_log_source = policy->log_source; qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized"); return policy; @@ -206,7 +211,8 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts; if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) && !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) && - !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) + !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) && + !qd_entity_set_long(entity, "maxMessageSizeDenied", dc->maxSizeMessagesDenied) ) return QD_ERROR_NONE; return qd_error_code(); @@ -218,13 +224,14 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) **/ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { // Return global stats - uint64_t np, nd, nc, nl, nt; + uint64_t np, nd, nc, nl, nm, nt; sys_mutex_lock(stats_lock); { np = n_processed; nd = n_denied; nc = n_connections; nl = n_links_denied; + nm = n_maxsize_messages_denied; nt = n_total_denials; } sys_mutex_unlock(stats_lock); @@ -232,6 +239,7 @@ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { !qd_entity_set_long(entity, "connectionsDenied", nd) && !qd_entity_set_long(entity, "connectionsCurrent", nc) && !qd_entity_set_long(entity, "linksDenied", nl) && + !qd_entity_set_long(entity, "maxMessageSizeDenied", nm) && !qd_entity_set_long(entity, "totalDenials", nt) ) return QD_ERROR_NONE; @@ -303,19 +311,21 @@ void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn) qd_python_unlock(lock_state); } const char *hostname = qd_connection_name(conn); - int ssnDenied = 0; - int sndDenied = 0; - int rcvDenied = 0; + uint64_t ssnDenied = 0; + uint64_t sndDenied = 0; + uint64_t rcvDenied = 0; + uint64_t sizDenied = 0; if (conn->policy_settings && conn->policy_settings->denialCounts) { ssnDenied = conn->policy_settings->denialCounts->sessionDenied; sndDenied = conn->policy_settings->denialCounts->senderDenied; rcvDenied = conn->policy_settings->denialCounts->receiverDenied; - } - qd_log(policy->log_source, QD_LOG_DEBUG, + sizDenied = conn->policy_settings->denialCounts->maxSizeMessagesDenied; + qd_log(policy->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d, " - "sessions_denied=%d, senders_denied=%d, receivers_denied=%d. nConnections= %d.", + "sessions_denied=%ld, senders_denied=%ld, receivers_denied=%ld. nConnections= %ld.", conn->connection_id, hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, - ssnDenied, sndDenied, rcvDenied, n_connections); + ssnDenied, sndDenied, rcvDenied, sizDenied, n_connections); + } } @@ -502,6 +512,7 @@ bool qd_policy_open_fetch_settings( settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); + settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); } @@ -666,6 +677,20 @@ void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_ } +// +// +void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn) +{ + sys_mutex_lock(stats_lock); + n_maxsize_messages_denied++; + n_total_denials++; + sys_mutex_unlock(stats_lock); + // TODO: denialCounts is shared among connections and should be protected also + if (qd_conn->policy_settings && qd_conn->policy_settings->denialCounts) { + qd_conn->policy_settings->denialCounts->maxSizeMessagesDenied++; + } +} + /** * Given a char return true if it is a parse_tree token separater */ @@ -1474,3 +1499,8 @@ char * qd_policy_compile_allowed_csv(char * csv) free(dup); return result; } + + +qd_log_source_t* qd_policy_log_source() { + return policy_log_source; +} diff --git a/src/policy.h b/src/policy.h index a076eb3..44e8794 100644 --- a/src/policy.h +++ b/src/policy.h @@ -39,6 +39,7 @@ struct qd_policy_denial_counts_s { uint64_t sessionDenied; uint64_t senderDenied; uint64_t receiverDenied; + uint64_t maxSizeMessagesDenied; }; typedef struct qd_policy_t qd_policy_t; @@ -49,6 +50,7 @@ struct qd_policy__settings_s { int maxSessions; int maxSenders; int maxReceivers; + int maxMessageSize; bool allowDynamicSource; bool allowAnonymousSender; bool allowUserIdProxy; @@ -231,6 +233,7 @@ char * qd_policy_host_pattern_lookup(qd_policy_t *policy, const char *hostPatter * @return the ruleset string to be used in policy settings. */ char * qd_policy_compile_allowed_csv(char * csv); + /** * Approve sending of message on anonymous link based on connection's policy. * @@ -238,4 +241,17 @@ char * qd_policy_compile_allowed_csv(char * csv); * @param[in] qd_conn dispatch connection with policy settings */ bool qd_policy_approve_message_target(qd_iterator_t *address, qd_connection_t *qd_conn); + +/** + * Increment counters for a link when policy maxMessageSize limit is exceeded. + * + * @param[in] pn_link proton link being with delivery/transfer being rejected + * @param[in] qd_conn dispatch connection with policy settings and counts + **/ +void qd_policy_count_max_size_event(pn_link_t *link, qd_connection_t *qd_conn); + +/** + * Return POLICY log_source to log policy + */ +qd_log_source_t* qd_policy_log_source(); #endif diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 715d8d0..9e0400e 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -50,6 +50,12 @@ qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery) } +bool qdr_delivery_oversize(const qdr_delivery_t *delivery) +{ + return delivery && delivery->msg && qd_message_oversize(delivery->msg); +} + + bool qdr_delivery_send_complete(const qdr_delivery_t *delivery) { if (!delivery) diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index 9a7edfb..8807996 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -76,6 +76,7 @@ ALLOC_DECLARE(qdr_delivery_t); bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery); bool qdr_delivery_send_complete(const qdr_delivery_t *delivery); +bool qdr_delivery_oversize(const qdr_delivery_t *delivery); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(const qdr_delivery_t *delivery); @@ -89,6 +90,7 @@ uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery); void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition); void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted); +bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery); qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery); qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery); diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index f885eef..3dab635 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -185,7 +185,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) DEQ_REMOVE_HEAD(link->undelivered); dlv->link_work = 0; - if (settled) { + if (settled || qdr_delivery_oversize(dlv) || qdr_delivery_is_aborted(dlv)) { dlv->where = QDR_DELIVERY_NOWHERE; qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - remove from undelivered list"); } else { diff --git a/src/router_node.c b/src/router_node.c index a735280..d2e2d36 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -27,6 +27,7 @@ #include "entity_cache.h" #include "router_private.h" #include "delivery.h" +#include "policy.h" #include <qpid/dispatch/router_core.h> #include <qpid/dispatch/proton_utils.h> #include <proton/sasl.h> @@ -287,8 +288,9 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa const qd_server_config_t *cf = qd_connection_config(conn); if (!cf) return; char buf[qd_message_repr_len()]; - const char *msg_str = qd_message_aborted(msg) ? - "aborted message" : qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); + const char *msg_str = qd_message_oversize(msg) ? "oversize message" : + qd_message_aborted(msg) ? "aborted message" : + qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); if (msg_str) { const char *src = pn_terminus_get_address(pn_link_source(pn_link)); const char *tgt = pn_terminus_get_address(pn_link_target(pn_link)); @@ -335,49 +337,67 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) qd_message_t *msg = qd_message_receive(pnd); bool receive_complete = qd_message_receive_complete(msg); - if (receive_complete) { - log_link_message(conn, pn_link, msg); + if (!qd_message_oversize(msg)) { + // message not rejected as oversize + if (receive_complete) { + log_link_message(conn, pn_link, msg); - // - // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). - // - pn_link_advance(pn_link); - next_delivery = pn_link_current(pn_link) != 0; + // + // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). + // + pn_link_advance(pn_link); + next_delivery = pn_link_current(pn_link) != 0; - uint64_t local_disp = qdr_delivery_disposition(delivery); - if (local_disp != 0) { - pn_delivery_update(pnd, local_disp); + uint64_t local_disp = qdr_delivery_disposition(delivery); + if (local_disp != 0) { + pn_delivery_update(pnd, local_disp); + } } - } - if (qd_message_is_discard(msg)) { - // - // Message has been marked for discard, no further processing necessary - // + if (qd_message_is_discard(msg)) { + // + // Message has been marked for discard, no further processing necessary + // + if (receive_complete) { + // If this discarded delivery has already been settled by proton, + // set the presettled flag on the delivery to true if it is not already true. + // Since the entire message has already been received, we directly call the + // function to set the pre-settled flag since we cannot go thru the core-thread + // to do this since the delivery has been discarded. + // Discarded streaming deliveries are not put thru the core thread via the continue action. + if (pn_delivery_settled(pnd)) + qdr_delivery_set_presettled(delivery); + + + // note: expected that the code that set discard has handled + // setting disposition and updating flow! + pn_delivery_settle(pnd); + if (delivery) { + // if delivery already exists then the core thread discarded this + // delivery, it will eventually free the qdr_delivery_t and its + // associated message - do not free it here. + qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); + } else { + qd_message_free(msg); + } + } + return next_delivery; + } + } else { + // message is oversize if (receive_complete) { - // If this discarded delivery has already been settled by proton, - // set the presettled flag on the delivery to true if it is not already true. - // Since the entire message has already been received, we directly call the - // function to set the pre-settled flag since we cannot go thru the core-thread - // to do this since the delivery has been discarded. - // Discarded streaming deliveries are not put thru the core thread via the continue action. - if (pn_delivery_settled(pnd)) - qdr_delivery_set_presettled(delivery); - - - // note: expected that the code that set discard has handled - // setting disposition and updating flow! + // reject and settle the incoming delivery + pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); - if (delivery) { - // if delivery already exists then the core thread discarded this - // delivery, it will eventually free the qdr_delivery_t and its - // associated message - do not free it here. - qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); - } else { - qd_message_free(msg); - } + // close the connection + pn_connection_t * pn_conn = qd_connection_pn(conn); + pn_condition_t * cond = pn_connection_condition(pn_conn); + (void) pn_condition_set_name( cond, QD_AMQP_COND_CONNECTION_FORCED); + (void) pn_condition_set_description(cond, "Message size exceeded"); + pn_connection_close(pn_conn); } - return next_delivery; + return false; + // oversize messages are not processed any further } // @@ -1913,3 +1933,21 @@ void qd_link_restart_rx(qd_link_t *in_link) qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr); } } + + +// Issue a warning POLICY log message with connection and link identities +// prepended to the policy denial text string. +void qd_connection_log_policy_denial(qd_link_t *link, const char *text) +{ + qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); + uint64_t l_id = 0; + uint64_t c_id = 0; + if (rlink) { + l_id = rlink->identity; + if (rlink->conn) { + c_id = rlink->conn->identity; + } + } + qd_log(qd_policy_log_source(), QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] %s", + c_id, l_id, text); +} diff --git a/src/server.c b/src/server.c index b3a553e..c0e57c3 100644 --- a/src/server.c +++ b/src/server.c @@ -1665,3 +1665,7 @@ sys_mutex_t *qd_server_get_activation_lock(qd_server_t * server) { return server->conn_activation_lock; } + +int qd_connection_max_message_size(const qd_connection_t *c) { + return (c && c->policy_settings) ? c->policy_settings->maxMessageSize : 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e34db23..041ee26 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -99,6 +99,8 @@ foreach(py_test_module system_tests_default_distribution system_tests_deprecated system_tests_policy + system_tests_policy_oversize_basic + system_tests_policy_oversize_compound system_tests_protocol_family system_tests_protocol_settings system_tests_qdmanage diff --git a/tests/system_tests_policy_oversize_basic.py b/tests/system_tests_policy_oversize_basic.py new file mode 100644 index 0000000..8040817 --- /dev/null +++ b/tests/system_tests_policy_oversize_basic.py @@ -0,0 +1,792 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import unittest as unittest +import os, json, re, signal +import sys +import time + +from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR, QdManager, Logger +from subprocess import PIPE, STDOUT +from proton import ConnectionException, Timeout, Url, symbol, Message +from proton.handlers import MessagingHandler +from proton.reactor import Container, ReceiverOption +from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse +from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled +from qpid_dispatch_internal.compat import dict_iteritems +from test_broker import FakeBroker + +# How many worker threads? +W_THREADS = 2 + +# Define oversize denial condition +OVERSIZE_CONDITION_NAME = "amqp:connection:forced" +OVERSIZE_CONDITION_DESC = "Message size exceeded" + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +# +# DISPATCH-975 Detect that an oversize message is blocked. +# These tests check basic blocking where the the sender is blocked by +# the ingress routers. It does not check compound blocking where +# oversize is allowed or denied by an ingress edge router but also +# denied by the uplink interior router. + +class OversizeMessageTransferTest(MessagingHandler): + """ + This test connects a sender and a receiver. Then it tries to send _count_ + number of messages of the given size through the router or router network. + + With expect_block=True the ingress router should detect the sender's oversize + message and close the sender connection. The receiver may receive + aborted message indications but that is not guaranteed. If any aborted + messages are received then the count must be at most one. + The test is a success when the sender receives a connection error with + oversize indication and the receiver has not received too many aborts. + + With expect_block=False sender messages should be received normally. + The test is a success when n_accepted == count. + """ + def __init__(self, sender_host, receiver_host, test_address, + message_size=100000, count=10, expect_block=True, print_to_console=False): + super(OversizeMessageTransferTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.test_address = test_address + self.msg_size = message_size + self.count = count + self.expect_block = expect_block + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.sender = None + self.receiver = None + self.proxy = None + + self.n_sent = 0 + self.n_rcvd = 0 + self.n_accepted = 0 + self.n_rejected = 0 + self.n_aborted = 0 + self.n_connection_error = 0 + self.shut_down = False + + self.logger = Logger(title=("OversizeMessageTransferTest - %s" % (self.test_address)), print_to_console=print_to_console) + self.log_unhandled = False + + def timeout(self): + self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_rejected=%d n_aborted=%d" % \ + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted) + self.logger.log("self.timeout " + self.error) + self._shut_down_test() + + def on_start(self, event): + self.logger.log("on_start") + self.timer = event.reactor.schedule(10, Timeout(self)) + self.logger.log("on_start: opening receiver connection to %s" % (self.receiver_host.addresses[0])) + self.receiver_conn = event.container.connect(self.receiver_host.addresses[0]) + self.logger.log("on_start: opening sender connection to %s" % (self.sender_host.addresses[0])) + self.sender_conn = event.container.connect(self.sender_host.addresses[0]) + self.logger.log("on_start: Creating receiver") + self.receiver = event.container.create_receiver(self.receiver_conn, self.test_address) + self.logger.log("on_start: Creating sender") + self.sender = event.container.create_sender(self.sender_conn, self.test_address) + self.logger.log("on_start: done") + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.count: + # construct message in indentifiable chunks + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[self.n_sent % 30] + while len(body_msg) < self.msg_size: + chunk = "[%s:%d:%d" % (self.test_address, self.n_sent, len(body_msg)) + padlen = 50 - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.msg_size: + body_msg = body_msg[:self.msg_size] + self.logger.log("send. address:%s message:%d of %s length=%d" % + (self.test_address, self.n_sent, self.count, self.msg_size)) + m = Message(body=body_msg) + self.sender.send(m) + self.n_sent += 1 + + def on_sendable(self, event): + if event.sender == self.sender: + self.logger.log("on_sendable") + self.send() + + def on_message(self, event): + if self.expect_block: + # All messages should violate maxMessageSize. + # Receiving any is an error. + self.error = "Received a message. Expected to receive no messages." + self.logger.log(self.error) + self._shut_down_test() + else: + self.n_rcvd += 1 + self.accept(event.delivery) + self._check_done() + + def on_connection_remote_close(self, event): + if self.shut_down: + return + if event.connection == self.sender_conn: + if not event.connection.remote_condition is None: + if event.connection.remote_condition.name == OVERSIZE_CONDITION_NAME and \ + event.connection.remote_condition.description == OVERSIZE_CONDITION_DESC: + self.logger.log("on_connection_remote_close: sender closed with correct condition") + self.n_connection_error += 1 + self.sender_conn.close() + self.sender_conn = None + else: + # sender closed but for wrong reason + self.error = "sender close error: Expected name: %s, description: %s, but received name: %s, description: %s" % ( + OVERSIZE_CONDITION_NAME, OVERSIZE_CONDITION_DESC, + event.connection.remote_condition.name, event.connection.remote_condition.description) + self.logger.log(self.error) + else: + self.error = "sender close error: Expected a remote_condition but there was none." + self.logger.log(self.error) + else: + # connection error but not for sender + self.error = "unexpected connection close error: wrong connection closed." + self.logger.log(self.error) + self._check_done() + + def _shut_down_test(self): + self.shut_down = True + if self.timer: + self.timer.cancel() + self.timer = None + if self.sender: + self.sender.close() + self.sender = None + if self.receiver: + self.receiver.close() + self.receiver = None + if self.sender_conn: + self.sender_conn.close() + self.sender_conn = None + if self.receiver_conn: + self.receiver_conn.close() + self.receiver_conn = None + + def _check_done(self): + current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d connection_error:%d" % + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, self.n_connection_error)) + self.logger.log(current) + if self.error is not None: + self.logger.log("TEST FAIL") + self._shut_down_test() + else: + done = (self.n_connection_error == 1) \ + if self.expect_block else \ + (self.n_sent == self.count and self.n_rcvd == self.count) + + if done: + self.logger.log("TEST DONE!!!") + # self.log_unhandled = True # verbose debugging + self._shut_down_test() + + def on_rejected(self, event): + self.n_rejected += 1 + if self.expect_block: + self.logger.log("on_rejected: entry") + self._check_done() + else: + self.error = "Unexpected on_reject" + self.logger.log(self.error) + self._check_done() + + def on_aborted(self, event): + self.logger.log("on_aborted") + self.n_aborted += 1 + self._check_done() + + def on_error(self, event): + self.error = "Container error" + self.logger.log(self.error) + self._shut_down_test() + + def on_unhandled(self, method, *args): + if self.log_unhandled: + self.logger.log("on_unhandled: method: %s, args: %s" % (method, args)) + + def run(self): + try: + Container(self).run() + except Exception as e: + self.error = "Container run exception: %s" % (e) + self.logger.log(self.error) + self.logger.dump() + + + +# For the next test case define max sizes for each router. +# These are the configured maxMessageSize values +EA1_MAX_SIZE = 50000 +INTA_MAX_SIZE = 100000 +INTB_MAX_SIZE = 150000 +EB1_MAX_SIZE = 200000 + +# Interior routers enforce max size directly. +# Edge routers are also checked by the attached interior router. + +# Block tests that use edge routers that send messages to the network must +# account for the fact that the attached interior router will apply +# another max size. These tests do not check against EB1 max for the +# sender if the receiver is on EA1, INTA, or INTB since INTB's max +# would kick an and cause a false positive. + +# Tests that check for allowing near-max sizes use the minimum of +# the edge router's max and the attached interior router's max. + +# The bytes-over and bytes-under max that should trigger allow or deny. +# Messages with content this much over should be blocked while +# messages with content this much under should be allowed. +# * client overhead is typically 16 bytes or so +# * interrouter overhead is much larger with annotations +OVER_UNDER = 200 + + +class MaxMessageSizeBlockOversize(TestCase): + """ + verify that maxMessageSize blocks oversize messages + """ + @classmethod + def setUpClass(cls): + """Start the router""" + super(MaxMessageSizeBlockOversize, cls).setUpClass() + + def router(name, mode, max_size, extra): + config = [ + ('router', {'mode': mode, + 'id': name, + 'allowUnsettledMulticast': 'yes', + 'workerThreads': W_THREADS}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}), + ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true', + 'groups': [( + '$default', { + 'users': '*', + 'maxConnections': 100, + 'remoteHosts': '*', + 'sources': '*', + 'targets': '*', + 'allowAnonymousSender': 'true', + 'allowWaypointLinks': 'true', + 'allowDynamicSource': 'true' + } + )]} + ) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # two edge routers connected via 2 interior routers with max sizes + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # + # Note: + # * Messages whose senders connect to INT.A or INT.B are subject to max message size + # defined for the ingress router only. + # * Message whose senders connect to EA1 or EA2 are subject to max message size + # defined for the ingress router. If the message is forwarded through the + # connected interior router then the message is subject to another max message size + # defined by the interior router. + + cls.routers = [] + + interrouter_port = cls.tester.get_port() + cls.INTA_edge_port = cls.tester.get_port() + cls.INTB_edge_port = cls.tester.get_port() + + router('INT.A', 'interior', INTA_MAX_SIZE, + [('listener', {'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})]) + cls.INT_A = cls.routers[0] + cls.INT_A.listener = cls.INT_A.addresses[0] + + router('INT.B', 'interior', INTB_MAX_SIZE, + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', + 'port': cls.INTB_edge_port})]) + cls.INT_B = cls.routers[1] + cls.INT_B.listener = cls.INT_B.addresses[0] + + router('EA1', 'edge', EA1_MAX_SIZE, + [('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()}), + ('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INTA_edge_port})]) + cls.EA1 = cls.routers[2] + cls.EA1.listener = cls.EA1.addresses[0] + + router('EB1', 'edge', EB1_MAX_SIZE, + [('connector', {'name': 'uplink', + 'role': 'edge', + 'port': cls.INTB_edge_port, + 'maxFrameSize': 1024}), + ('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()})]) + cls.EB1 = cls.routers[3] + cls.EB1.listener = cls.EB1.addresses[0] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + cls.EA1.wait_connectors() + cls.EB1.wait_connectors() + + def test_40_block_oversize_INTA_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_A, + "e40", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_40 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_41_block_oversize_INTA_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_B, + "e41", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_41 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_42_block_oversize_INTA_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EA1, + "e42", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_42 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_43_block_oversize_INTA_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EB1, + "e43", + message_size=INTA_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_43 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_44_block_oversize_INTB_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_A, + "e44", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_44 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_45_block_oversize_INTB_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_B, + "e45", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_45 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_46_block_oversize_INTB_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EA1, + "e46", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_46 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_47_block_oversize_INTB_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EB1, + "e47", + message_size=INTB_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_47 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_48_block_oversize_EA1_INTA(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to INT.A may be blocked by EA1 limit and also by INT.A limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_A, + "e48", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_48 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_49_block_oversize_EA1_INTB(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to INT.B may be blocked by EA1 limit and also by INT.A limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_B, + "e49", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_49 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4a_block_oversize_EA1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EA1, + "e4a", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4a test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4b_block_oversize_EA1_EB1(self): + if EA1_MAX_SIZE >= INTA_MAX_SIZE: + self.skipTest("EA1 sending to EB1 may be blocked by EA1 limit and also by INT.A limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EB1, + "e4b", + message_size=EA1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4b test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4c_block_oversize_EB1_INTA(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to INT.A may be blocked by EB1 limit and also by INT.B limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_A, + "e4c", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4c test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4d_block_oversize_EB1_INTB(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to INT.B may be blocked by EB1 limit and also by INT.B limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e4d", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4d test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4e_block_oversize_EB1_EA1(self): + if EB1_MAX_SIZE > INTB_MAX_SIZE: + self.skipTest("EB1 sending to EA1 may be blocked by EB1 limit and also by INT.B limit. That condition is tested in compound test.") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e4e", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4e test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_4f_block_oversize_EB1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EB1, + "e4f", + message_size=EB1_MAX_SIZE + OVER_UNDER, + expect_block=True) + test.run() + if test.error is not None: + test.logger.log("test_4f test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # + # tests under maxMessageSize should not block + # + def test_50_allow_undersize_INTA_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_A, + "e50", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_50 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_51_allow_undersize_INTA_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.INT_B, + "e51", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_51 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_52_allow_undersize_INTA_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EA1, + "e52", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_52 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_53_allow_undersize_INTA_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.EB1, + "e53", + message_size=INTA_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_53 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_54_allow_undersize_INTB_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_A, + "e54", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_54 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_55_allow_undersize_INTB_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.INT_B, + "e55", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_55 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_56_allow_undersize_INTB_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EA1, + "e56", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_56 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_57_allow_undersize_INTB_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.INT_B, + MaxMessageSizeBlockOversize.EB1, + "e57", + message_size=INTB_MAX_SIZE - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_57 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_58_allow_undersize_EA1_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_A, + "e58", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_58 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_59_allow_undersize_EA1_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.INT_B, + "e59", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_59 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5a_allow_undersize_EA1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EA1, + "e5a", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5a test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5b_allow_undersize_EA1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.EB1, + "e5b", + message_size=min(EA1_MAX_SIZE, INTA_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5b test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + def test_5c_allow_undersize_EB1_INTA(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_A, + "e5c", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5c test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5d_allow_undersize_EB1_INTB(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e5d", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5d test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5e_allow_undersize_EB1_EA1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e5e", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5e test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + + def test_5f_allow_undersize_EB1_EB1(self): + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EB1, + "e5f", + message_size=min(EB1_MAX_SIZE, INTB_MAX_SIZE) - OVER_UNDER, + expect_block=False) + test.run() + if test.error is not None: + test.logger.log("test_5f test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + +if __name__ == '__main__': + unittest.main(main_module()) diff --git a/tests/system_tests_policy_oversize_compound.py b/tests/system_tests_policy_oversize_compound.py new file mode 100644 index 0000000..35da8b5 --- /dev/null +++ b/tests/system_tests_policy_oversize_compound.py @@ -0,0 +1,1058 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import unittest as unittest +import os, json, re, signal +import sys +import time + +from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR, QdManager, Logger +from subprocess import PIPE, STDOUT +from proton import ConnectionException, Timeout, Url, symbol, Message +from proton.handlers import MessagingHandler +from proton.reactor import Container, ReceiverOption +from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse +from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled +from qpid_dispatch_internal.compat import dict_iteritems +from test_broker import FakeBroker + +# How many worker threads? +W_THREADS = 2 + +# Define oversize denial condition +OVERSIZE_CONDITION_NAME = "amqp:connection:forced" +OVERSIZE_CONDITION_DESC = "Message size exceeded" + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +# For the next test case define max sizes for each router. +# These are the configured maxMessageSize values +EA1_MAX_SIZE = 50000 +INTA_MAX_SIZE = 100000 +INTB_MAX_SIZE = 150000 +EB1_MAX_SIZE = 200000 + +# Interior routers check for max message size at message ingress over all connections +# except interrouter connections. +# Edge routers check for max message size at message ingress over all connections +# except interior router connections. +# Edge routers may check a max message size and allow the message to be delivered +# to destinations on that edge router. However, if the message is passed to an +# interior router then the message is subject to the interior router's max size +# before the message is forwarded by the interior router network. + +# The bytes-over and bytes-under max that should trigger allow or deny. +# Messages with content this much over should be blocked while +# messages with content this much under should be allowed. +# * client overhead is typically 16 bytes or so +# * interrouter overhead is much larger with annotations +OVER_UNDER = 200 + +# Alert: +# This module has two large classes that are laid out about the same: +# OversizeMessageTransferTest +# OversizeMulticastTransferTest +# The MessageTransfer test does a single sender and single receiver while +# the MulticastTransfer test does a single sender and four receivers. +# Much of the logic between tests is duplicated. Remember to fix things in both tests. + +class OversizeMessageTransferTest(MessagingHandler): + """ + This test connects a sender and a receiver. Then it tries to send _count_ + number of messages of the given size through the router or router network. + + Messages are to pass through an edge router and get blocked by an interior + or messages are to be blocked by both the edge and the interior. + + When 'blocked_by_both' is false then: + + * The ingress router should allow the sender's oversize message. + * The message is blocked by the uplink router by rejecting the message + and closing the connection between the interior and edge routers. + * The receiver may receive aborted message indications but that is + not guaranteed. + * If any aborted messages are received then the count must be at most one. + + When 'blocked_by_both' is true then: + * The ingress edge router will reject and close the connection on the first message + * The second message may be aborted because the connection between the + edge router and the interior router was closed + * The remainder of the messages are going into a closed connection and + will receive no settlement. + """ + def __init__(self, test_class, sender_host, receiver_host, test_address, + message_size=100000, count=10, blocked_by_both=False, print_to_console=False): + """ + Construct an instance of the unicast test + :param test_class: test class - has wait-connection function + :param sender_host: router for sender connection + :param receiver_host: router for receiver connection + :param test_address: sender/receiver AMQP address + :param message_size: in bytes + :param count: how many messages to send + :param blocked_by_both: true if edge router messages are also blocked by interior + :param print_to_console: print logs as they happen + """ + super(OversizeMessageTransferTest, self).__init__() + self.test_class = test_class + self.sender_host = sender_host + self.receiver_host = receiver_host + self.test_address = test_address + self.msg_size = message_size + self.count = count + self.blocked_by_both = blocked_by_both + self.expect_block = True + self.messages = [] + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.sender = None + self.receiver = None + self.proxy = None + + self.network_stable = False + self.n_sent = 0 + self.n_rcvd = 0 + self.n_accepted = 0 + self.n_rejected = 0 + self.n_modified = 0 + self.n_released = 0 + self.n_send_settled = 0 + self.n_aborted = 0 + self.n_connection_error = 0 + self.shut_down = False + + self.logger = Logger(title=("OversizeMessageTransferTest - %s" % (self.test_address)), print_to_console=print_to_console) + self.log_unhandled = False # verbose diagnostics of proton callbacks + + def timeout(self): + current = ("check_done: sent=%d rcvd=%d rejected=%d aborted=%d connection_error:%d send_settled:%d" % + (self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, self.n_connection_error, self.n_send_settled)) + self.error = "Timeout Expired " + current + self.logger.log("self.timeout " + self.error) + self._shut_down_test() + + def on_start(self, event): + self.logger.log("on_start") + + self.logger.log("on_start: secheduling reactor timeout") + self.timer = event.reactor.schedule(10, Timeout(self)) + + self.logger.log("Waiting for router network to stabilize") + self.test_class.wait_router_network_connected() + self.network_stable = True + + self.logger.log("on_start: generating messages") + for idx in range(self.count): + # construct message in indentifiable chunks + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] + while len(body_msg) < self.msg_size: + chunk = "[%s:%d:%d" % (self.test_address, idx, len(body_msg)) + padlen = 50 - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.msg_size: + body_msg = body_msg[:self.msg_size] + m = Message(body=body_msg) + self.messages.append(m) + + self.logger.log("on_start: opening receiver connection to %s" % (self.receiver_host.addresses[0])) + self.receiver_conn = event.container.connect(self.receiver_host.addresses[0]) + + self.logger.log("on_start: Creating receiver") + self.receiver = event.container.create_receiver(self.receiver_conn, self.test_address) + + self.logger.log("on_start: opening sender connection to %s" % (self.sender_host.addresses[0])) + self.sender_conn = event.container.connect(self.sender_host.addresses[0]) + + self.logger.log("on_start: Creating sender") + self.sender = event.container.create_sender(self.sender_conn, self.test_address) + + self.logger.log("on_start: done") + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.count: + m = self.messages[self.n_sent] + self.logger.log("send. address:%s message:%d of %s length=%d" % ( + self.test_address, self.n_sent, self.count, self.msg_size)) + self.sender.send(m) + self.n_sent += 1 + #if self.n_sent == self.count: + # self.log_unhandled = True + + def on_sendable(self, event): + if event.sender == self.sender: + self.logger.log("on_sendable") + self.send() + + def on_message(self, event): + self.logger.log("on_message: entry") + if self.expect_block: + # All messages should violate maxMessageSize. + # Receiving any is an error. + self.error = "Received a message. Expected to receive no messages." + self.logger.log(self.error) + self._shut_down_test() + else: + self.n_rcvd += 1 + self.accept(event.delivery) + self._check_done() + + def on_connection_remote_close(self, event): + if self.shut_down: + return + if event.connection == self.sender_conn: + if not event.connection.remote_condition is None: + if event.connection.remote_condition.name == OVERSIZE_CONDITION_NAME and \ + event.connection.remote_condition.description == OVERSIZE_CONDITION_DESC: + self.logger.log("on_connection_remote_close: sender closed with correct condition") + self.n_connection_error += 1 + self.sender_conn.close() + self.sender_conn = None + else: + # sender closed but for wrong reason + self.error = "sender close error: Expected name: %s, description: %s, but received name: %s, description: %s" % ( + OVERSIZE_CONDITION_NAME, OVERSIZE_CONDITION_DESC, + event.connection.remote_condition.name, event.connection.remote_condition.description) + self.logger.log(self.error) + else: + self.error = "sender close error: Expected a remote_condition but there was none." + self.logger.log(self.error) + else: + # connection error but not for sender + self.error = "unexpected connection close error: wrong connection closed." + self.logger.log(self.error) + self._check_done() + + def _shut_down_test(self): + self.shut_down = True + if self.timer: + self.timer.cancel() + self.timer = None + if self.sender: + self.sender.close() + self.sender = None + if self.receiver: + self.receiver.close() + self.receiver = None + if self.sender_conn: + self.sender_conn.close() + self.sender_conn = None + if self.receiver_conn: + self.receiver_conn.close() + self.receiver_conn = None + + def _current(self): + return ("net_stable=%s sent=%d rcvd=%d rejected=%d aborted=%d connection_error:%d send_settled:%d" % + (self.network_stable, self.n_sent, self.n_rcvd, self.n_rejected, self.n_aborted, self.n_connection_error, self.n_send_settled)) + + def _check_done(self): + self.logger.log("check_done: " + self._current()) + if self.error is not None: + self.logger.log("TEST FAIL") + self._shut_down_test() + else: + if not self.blocked_by_both: + # Blocked by interior only. Connection to edge stays up + # and all messages must be accounted for. + done = self.n_rejected == 1 and \ + self.n_send_settled == self.count + else: + # Blocked by interior and edge. Expect edge connection to go down + # and some of our messaages arrive at edge after it has sent + # AMQP close. Those messages are never settled. TODO: Is that OK? + done = self.n_rejected == 1 and \ + self.n_connection_error == 1 + if done: + self.logger.log("TEST DONE!!!") + # self.log_unhandled = True # verbose debugging + self._shut_down_test() + + def on_rejected(self, event): + self.n_rejected += 1 + if self.expect_block: + self.logger.log("on_rejected: entry") + self._check_done() + else: + self.error = "Unexpected on_reject" + self.logger.log(self.error) + self._check_done() + + def on_aborted(self, event): + self.logger.log("on_aborted") + self.n_aborted += 1 + self._check_done() + + def on_settled(self, event): + self.logger.log("on_settled") + if event.connection == self.sender_conn: + self.logger.log("on_settled: sender connection") + self.n_send_settled += 1 + self._check_done() + + def on_error(self, event): + self.error = "Container error" + self.logger.log(self.error) + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_link_error(self, event): + self.error = event.link.remote_condition.name + self.logger.log("on_link_error: %s" % (self.error)) + # Link errors may prevent normal test shutdown so don't even try. + raise Exception(self.error) + + def on_reactor_final(self, event): + self.logger.log("on_reactor_final:") + + def on_unhandled(self, method, *args): + if self.log_unhandled: + self.logger.log("on_unhandled: method: %s, args: %s" % (method, args)) + + def run(self): + try: + Container(self).run() + except Exception as e: + self.error = "Container run exception: %s" % (e) + self.logger.log(self.error) + self.logger.dump() + time.sleep(0.2) + +# +# DISPATCH-975 Detect that an oversize message is blocked. +# These tests check simple and compound blocking for multicast messages. +# +# Indexes into router arrays for receivers and receiver stats +IDX_INTA = 0 +IDX_INTB = 1 +IDX_EA1 = 2 +IDX_EB1 = 3 + +class OversizeMulticastTransferTest(MessagingHandler): + """ + This test connects a sender and four receivers. Then it tries to send _count_ + number of messages of the given size through the router or router network. + """ + def __init__(self, test_class, sender_host, routers, test_address, expect_receives, + blocked_by_ingress, blocked_by_interior, + message_size=100000, count=10, print_to_console=False): + """ + Construct an instance of the multicast test + :param test_class: test class - has wait-connection function + :param sender_host: router for the sender connection + :param routers: a list of all the routers for receiver connections + :param test_address: sender/receiver AMQP address + :param expect_receives: array of expected receive counts + :param blocked_by_ingress: true if ingress router blocks + :param blocked_by_interior: true if edge router messages also blocked by interior + :param message_size: in bytes + :param count: how many messages to send + :param print_to_console: print logs as they happen + """ + super(OversizeMulticastTransferTest, self).__init__() + self.test_class = test_class + self.sender_host = sender_host + self.routers = routers + self.test_address = test_address + self.msg_size = message_size + self.count = count + self.expect_receives = expect_receives # router array + self.blocked_by_ingress = blocked_by_ingress + self.blocked_by_interior = blocked_by_interior + self.messages = [] + + self.sender_conn = None + self.receiver_conns = [None, None, None, None] # router array + self.error = None + self.sender = None + self.receivers = [None, None, None, None] # router array + self.proxy = None + + self.network_stable = False + self.n_sent = 0 + self.n_rcvds = [0, 0, 0, 0] # router array + self.n_accepted = 0 + self.n_rejected = 0 + self.n_modified = 0 + self.n_released = 0 + self.n_send_settled = 0 + self.n_aborteds = [0, 0, 0, 0] # router array + self.n_connection_error = 0 + self.shut_down = False + + self.logger = Logger(title=("OversizeMulticastTransferTest - %s" % (self.test_address)), print_to_console=print_to_console) + self.log_unhandled = False # verbose diagnostics of proton callbacks + + def timeout(self): + current = self._current() + self.error = "Timeout Expired " + current + self.logger.log("self.timeout " + self.error) + self._shut_down_test() + + def on_start(self, event): + self.logger.log("on_start") + + self.logger.log("on_start: secheduling reactor timeout") + self.timer = event.reactor.schedule(10, Timeout(self)) + + self.logger.log("Waiting for router network to stabilize") + self.test_class.wait_router_network_connected() + self.network_stable = True + + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + self.logger.log("on_start: opening receiver connection to %s" % (self.routers[idx].addresses[0])) + self.receiver_conns[idx] = event.container.connect(self.routers[idx].addresses[0]) + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + self.logger.log("on_start: Creating receiver %d" % idx) + self.receivers[idx] = event.container.create_receiver(self.receiver_conns[idx], self.test_address) + + self.logger.log("on_start: generating messages") + for idx in range(self.count): + # construct message in indentifiable chunks + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] + while len(body_msg) < self.msg_size: + chunk = "[%s:%d:%d" % (self.test_address, idx, len(body_msg)) + padlen = 50 - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.msg_size: + body_msg = body_msg[:self.msg_size] + m = Message(body=body_msg) + self.messages.append(m) + + self.logger.log("on_start: opening sender connection to %s" % (self.sender_host.addresses[0])) + self.sender_conn = event.container.connect(self.sender_host.addresses[0]) + + self.logger.log("on_start: Creating sender") + self.sender = event.container.create_sender(self.sender_conn, self.test_address) + + self.logger.log("on_start: done") + + def rcvr_idx_of(self, rcvr): + """ + Given a receiver, as in event.receiver, return + the router array index of that receiver's router + :param rcvr: + :return: integer index of receiver + """ + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + if rcvr == self.receivers[idx]: + return idx + self.error = "Receiver not found in receivers array." + self.logger.log(self.error) + self.logger.dump() + self._shut_down_test() + raise Exception(self.error) + + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.count: + m = self.messages[self.n_sent] + self.logger.log("send. address:%s message:%d of %s length=%d" % ( + self.test_address, self.n_sent, self.count, self.msg_size)) + self.sender.send(m) + self.n_sent += 1 + #if self.n_sent == self.count: + # self.log_unhandled = True + + def on_sendable(self, event): + if event.sender == self.sender: + self.logger.log("on_sendable") + self.send() + + def on_message(self, event): + self.logger.log("on_message") + if self.shut_down: + return + idx = self.rcvr_idx_of(event.receiver) + if self.expect_receives[idx] == 0: + # Receiving any is an error. + self.error = "Received a message. Expected to receive no messages." + self.logger.log(self.error) + self._shut_down_test() + else: + self.n_rcvds[idx] += 1 + self.accept(event.delivery) + self._check_done() + + def on_connection_remote_close(self, event): + if self.shut_down: + return + if event.connection == self.sender_conn: + if not event.connection.remote_condition is None: + if event.connection.remote_condition.name == OVERSIZE_CONDITION_NAME and \ + event.connection.remote_condition.description == OVERSIZE_CONDITION_DESC: + self.logger.log("on_connection_remote_close: sender closed with correct condition") + self.n_connection_error += 1 + self.sender_conn.close() + self.sender_conn = None + else: + # sender closed but for wrong reason + self.error = "sender close error: Expected name: %s, description: %s, but received name: %s, description: %s" % ( + OVERSIZE_CONDITION_NAME, OVERSIZE_CONDITION_DESC, + event.connection.remote_condition.name, event.connection.remote_condition.description) + self.logger.log(self.error) + else: + self.error = "sender close error: Expected a remote_condition but there was none." + self.logger.log(self.error) + else: + # connection error but not for sender + self.error = "unexpected connection close error: wrong connection closed." + self.logger.log(self.error) + self._check_done() + + def _shut_down_test(self): + self.shut_down = True + if self.timer: + self.timer.cancel() + self.timer = None + if self.sender: + self.sender.close() + self.sender = None + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + if self.receivers[idx]: + self.receivers[idx].close() + self.receivers[idx] = None + if self.sender_conn: + self.sender_conn.close() + self.sender_conn = None + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + if self.receiver_conns[idx]: + self.receiver_conns[idx].close() + self.receiver_conns[idx] = None + + def _current(self): + return ("net_stable:%s sent=%d rcvd=%s rejected=%d aborted=%s connection_error:%d send_settled:%d" % + (self.network_stable, self.n_sent, str(self.n_rcvds), self.n_rejected, str(self.n_aborteds), self.n_connection_error, self.n_send_settled)) + + def _check_done(self): + self.logger.log("check_done: " + self._current()) + if self.error is not None: + self.logger.log("TEST FAIL") + self._shut_down_test() + else: + if self.blocked_by_interior: + if self.blocked_by_ingress: + # Blocked by interior and edge. Expect edge connection to go down + # and some of our messaages arrive at edge after it has sent + # AMQP close. Those messages are never settled. TODO: Is that OK? + done = self.n_rejected == 1 and \ + self.n_connection_error == 1 + else: + # Blocked by interior only. Connection to edge stays up + # and all messages must be accounted for. + all_received = True + for idx in [IDX_INTA, IDX_INTB, IDX_EA1, IDX_EB1]: + if self.expect_receives[idx] > 0: + if not self.n_rcvds[idx] == self.expect_receives[idx]: + all_received = False + done = self.n_rejected <= 1 and \ + self.n_send_settled == self.count and \ + all_received + else: + # Blocked by edge should never deliver to interior + done = self.n_rejected == 1 and \ + self.n_connection_error == 1 + + if done: + self.logger.log("TEST DONE!!!") + # self.log_unhandled = True # verbose debugging + self._shut_down_test() + + def on_rejected(self, event): + self.n_rejected += 1 + if self.reject: + self.logger.log("on_rejected: entry") + self._check_done() + else: + self.error = "Unexpected on_reject" + self.logger.log(self.error) + self._check_done() + + def on_aborted(self, event): + self.logger.log("on_aborted") + if self.shut_down: + return + idx = self.rcvr_idx_of(event.receiver) + self.n_aborteds[idx] += 1 + self._check_done() + + def on_settled(self, event): + self.logger.log("on_settled") + if event.connection == self.sender_conn: + self.logger.log("on_settled: sender connection") + self.n_send_settled += 1 + self._check_done() + + def on_error(self, event): + self.error = "Container error" + self.logger.log(self.error) + self._shut_down_test() + + def on_link_error(self, event): + self.error = event.link.remote_condition.name + self.logger.log("on_link_error: %s" % (self.error)) + # Link errors may prevent normal test shutdown so don't even try. + raise Exception(self.error) + + def on_reactor_final(self, event): + self.logger.log("on_reactor_final:") + + def on_unhandled(self, method, *args): + if self.log_unhandled: + self.logger.log("on_unhandled: method: %s, args: %s" % (method, args)) + + def run(self): + try: + Container(self).run() + except Exception as e: + self.error = "Container run exception: %s" % (e) + self.logger.log(self.error) + self.logger.dump() + time.sleep(0.2) + + +class MaxMessageSizeBlockOversize(TestCase): + """ + verify that maxMessageSize blocks oversize messages + """ + @classmethod + def setUpClass(cls): + """Start the router""" + super(MaxMessageSizeBlockOversize, cls).setUpClass() + + def router(name, mode, max_size, extra): + config = [ + ('router', {'mode': mode, + 'id': name, + 'allowUnsettledMulticast': 'yes', + 'workerThreads': W_THREADS}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true', 'maxMessageSize': max_size, 'defaultVhost': '$default'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('vhost', {'hostname': '$default', 'allowUnknownUser': 'true', + 'groups': [( + '$default', { + 'users': '*', + 'maxConnections': 100, + 'remoteHosts': '*', + 'sources': '*', + 'targets': '*', + 'allowAnonymousSender': 'true', + 'allowWaypointLinks': 'true', + 'allowDynamicSource': 'true' + } + )]} + ) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + return cls.routers[-1] + + # configuration: + # two edge routers connected via 2 interior routers with max sizes + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # + # Note: + # * Messages whose senders connect to INT.A or INT.B are subject to max message size + # defined for the ingress router only. + # * Message whose senders connect to EA1 or EA2 are subject to max message size + # defined for the ingress router. If the message is forwarded through the + # connected interior router then the message is subject to another max message size + # defined by the interior router. + + cls.routers = [] + + interrouter_port = cls.tester.get_port() + cls.INTA_edge_port = cls.tester.get_port() + cls.INTB_edge_port = cls.tester.get_port() + + router('INT.A', 'interior', INTA_MAX_SIZE, + [('listener', {'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})]) + cls.INT_A = cls.routers[0] + cls.INT_A.listener = cls.INT_A.addresses[0] + + router('INT.B', 'interior', INTB_MAX_SIZE, + [('connector', {'name': 'connectorToA', + 'role': 'inter-router', + 'port': interrouter_port}), + ('listener', {'role': 'edge', + 'port': cls.INTB_edge_port})]) + cls.INT_B = cls.routers[1] + cls.INT_B.listener = cls.INT_B.addresses[0] + + router('EA1', 'edge', EA1_MAX_SIZE, + [('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()}), + ('connector', {'name': 'uplink', 'role': 'edge', + 'port': cls.INTA_edge_port})]) + cls.EA1 = cls.routers[2] + cls.EA1.listener = cls.EA1.addresses[0] + + router('EB1', 'edge', EB1_MAX_SIZE, + [('connector', {'name': 'uplink', + 'role': 'edge', + 'port': cls.INTB_edge_port, + 'maxFrameSize': 1024}), + ('listener', {'name': 'rc', 'role': 'route-container', + 'port': cls.tester.get_port()})]) + cls.EB1 = cls.routers[3] + cls.EB1.listener = cls.EB1.addresses[0] + + cls.wait_router_network_connected() + + @classmethod + def wait_router_network_connected(cls): + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + cls.EA1.wait_connectors() + cls.EB1.wait_connectors() + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + + cmd.split(' ') + + ['--bus', + address or self.address(), + '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception("%s\n%s" % (e, out)) + return out + + def sense_n_closed_lines(self, routername): + """ + Read a router's log file and count how many size-exceeded lines are in it. + :param routername: + :return: (int, int) tuple with counts of lines in and lines out + """ + with open("../setUpClass/%s.log" % routername, 'r') as router_log: + log_lines = router_log.read().split("\n") + i_closed_lines = [s for s in log_lines if OVERSIZE_CONDITION_DESC in s and "<-" in s] + o_closed_lines = [s for s in log_lines if OVERSIZE_CONDITION_DESC in s and "->" in s] + return (len(i_closed_lines), len(o_closed_lines)) + + + # verify that a message can go through an edge EB1 and get blocked by interior INT.B + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # | ^ + # V | + # +--------+ +-------+ + # |receiver| |sender | + # | | |199,800| + # +--------+ +-------+ + # + def test_60_block_oversize_EB1_INTB_at_INTB(self): + ibefore, obefore = self.sense_n_closed_lines("EB1") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e60", + message_size=EB1_MAX_SIZE - OVER_UNDER, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_60 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + iafter, oafter = self.sense_n_closed_lines("EB1") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 0 and idelta == 1 + if (not success): + test.logger.log("FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + + # verify that a message can go through an edge EB1 and get blocked by interior INT.B + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # | ^ + # V | + # +--------+ +-------+ + # |receiver| |sender | + # | | |199,800| + # +--------+ +-------+ + # + def test_61_block_oversize_EB1_EA1_at_INTB(self): + ibefore, obefore = self.sense_n_closed_lines("EB1") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.EA1, + "e61", + message_size=EB1_MAX_SIZE - OVER_UNDER, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_61 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + iafter, oafter = self.sense_n_closed_lines("EB1") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 0 and idelta == 1 + if (not success): + test.logger.log("FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + + # see what happens when a message must be blocked by edge and also by interior + # + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # | ^ + # V | + # +--------+ +-------+ + # |receiver| |sender | + # | | |200,200| + # +--------+ +-------+ + # + def test_70_block_oversize_EB1_INTB_at_both(self): + ibefore, obefore = self.sense_n_closed_lines("EB1") + test = OversizeMessageTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.INT_B, + "e70", + message_size=EB1_MAX_SIZE + OVER_UNDER, + blocked_by_both=True, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_70 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + # EB1 must close connection to sender (odelta == 1) but + # INT.B may or may not close the edge-interior link. Sometimes EB1 senses the + # oversize condition before it has forwarded too many bytes of the first message + # to INT.B. Then EB1 aborts the first message to INT.B and INT.B never + # detects an oversize condition. + iafter, oafter = self.sense_n_closed_lines("EB1") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 1 and (idelta == 0 or idelta == 1) + if (not success): + test.logger.log("FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + + # Verify that a multicast can go through an edge EB1 and get blocked by interior INT.B + # + # +-------+ +---------+ +---------+ +-------+ + # | rcvr | | rcvr | | rcvr | | rcvr | + # | no | | no | | no | | yes | + # +-------+ +---------+ +---------+ +-------+ + # ^ ^ ^ ^ + # | | | | + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # ^ + # | + # +-------+ + # |sender | + # |199,800| + # +-------+ + # + def test_80_block_multicast_EB1_INTB_at_INTB(self): + ibefore, obefore = self.sense_n_closed_lines("EB1") + count = 10 + test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.EB1, + MaxMessageSizeBlockOversize.routers, + "multicast/e80", + [0, 0, 0, count], + blocked_by_ingress = False, + blocked_by_interior = True, + message_size=EB1_MAX_SIZE - OVER_UNDER, + count = count, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_80 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + iafter, oafter = self.sense_n_closed_lines("EB1") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 0 and idelta == 1 + if (not success): + test.logger.log("FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + + # Verify that a multicast blocked by edge ingress goes to no receivers + # + # +-------+ +---------+ +---------+ +-------+ + # | rcvr | | rcvr | | rcvr | | rcvr | + # | no | | no | | no | | no | + # +-------+ +---------+ +---------+ +-------+ + # ^ ^ ^ ^ + # | | | | + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # ^ + # | + # +-------+ + # |sender | + # | 50,200| + # +-------+ + # + def test_81_block_multicast_EA1(self): + ibefore, obefore = self.sense_n_closed_lines("EA1") + count = 10 + test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.EA1, + MaxMessageSizeBlockOversize.routers, + "multicast/e81", + [0, 0, 0, 0], + blocked_by_ingress = True, + blocked_by_interior = False, + message_size=EA1_MAX_SIZE + OVER_UNDER, + count = count, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_81 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + iafter, oafter = self.sense_n_closed_lines("EA1") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 1 and idelta == 0 + if (not success): + test.logger.log("FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + + # Verify that a multicast blocked by interior ingress goes to no receivers + # + # +-------+ +---------+ +---------+ +-------+ + # | rcvr | | rcvr | | rcvr | | rcvr | + # | no | | no | | no | | no | + # +-------+ +---------+ +---------+ +-------+ + # ^ ^ ^ ^ + # | | | | + # +-------+ +---------+ +---------+ +-------+ + # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 | + # | 50,000| | 100,000 | | 150,000 | |200,000| + # +-------+ +---------+ +---------+ +-------+ + # ^ + # | + # +-------+ + # |sender | + # |100,200| + # +-------+ + # + def test_82_block_multicast_INTA(self): + ibefore, obefore = self.sense_n_closed_lines("INT.A") + count = 10 + test = OversizeMulticastTransferTest(MaxMessageSizeBlockOversize, + MaxMessageSizeBlockOversize.INT_A, + MaxMessageSizeBlockOversize.routers, + "multicast/e82", + [0, 0, 0, 0], + blocked_by_ingress=True, + blocked_by_interior=False, + message_size=INTA_MAX_SIZE + OVER_UNDER, + count=count, + print_to_console=False) + test.run() + if test.error is not None: + test.logger.log("test_82 test error: %s" % (test.error)) + test.logger.dump() + self.assertTrue(test.error is None) + + # Verify that interrouter link was shut down + iafter, oafter = self.sense_n_closed_lines("INT.A") + idelta = iafter - ibefore + odelta = oafter - obefore + success = odelta == 1 and idelta == 0 + if (not success): + test.logger.log( + "FAIL: N closed events in log file did not increment by 1. oBefore: %d, oAfter: %d, iBefore:%d, iAfter:%d" % + (obefore, oafter, ibefore, iafter)) + test.logger.dump() + self.assertTrue(success), "Expected router to generate close with condition: message size exceeded" + + +if __name__ == '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org