This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new b8613b8 DISPATCH-1224 - Completed endpoint-initiated waypoints.
b8613b8 is described below
commit b8613b8a64353ee021e218c8c487a9871517966c
Author: Ted Ross <[email protected]>
AuthorDate: Fri Dec 14 17:38:08 2018 -0500
DISPATCH-1224 - Completed endpoint-initiated waypoints.
DISPATCH-1224 - Added a waypoint and multi-phase test.
DISPATCH-1224 - Replaced use of sprintf with explicit characters.
---
include/qpid/dispatch/amqp.h | 9 +
include/qpid/dispatch/router_core.h | 11 +
python/qpid_dispatch/management/qdrouter.json | 7 +
.../qpid_dispatch_internal/policy/policy_local.py | 6 +-
src/amqp.c | 19 +-
src/policy.c | 41 ++
src/policy.h | 1 +
src/router_core/connections.c | 2 +-
.../modules/address_lookup_client/lookup_client.c | 7 +-
src/router_core/modules/edge_router/addr_proxy.c | 53 ++-
src/router_core/route_control.c | 2 +
src/router_core/router_core.c | 4 +-
src/router_core/router_core_private.h | 1 +
src/router_core/terminus.c | 20 +
tests/CMakeLists.txt | 1 +
tests/system_tests_multi_phase.py | 411 +++++++++++++++++++++
16 files changed, 574 insertions(+), 21 deletions(-)
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 35f7c67..9f6be37 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -127,7 +127,16 @@ extern const char * const QD_CAPABILITY_ANONYMOUS_RELAY;
extern const char * const QD_CAPABILITY_ROUTER_CONTROL;
extern const char * const QD_CAPABILITY_ROUTER_DATA;
extern const char * const QD_CAPABILITY_EDGE_DOWNLINK;
+extern const char * const QD_CAPABILITY_WAYPOINT_DEFAULT;
extern const char * const QD_CAPABILITY_WAYPOINT1;
+extern const char * const QD_CAPABILITY_WAYPOINT2;
+extern const char * const QD_CAPABILITY_WAYPOINT3;
+extern const char * const QD_CAPABILITY_WAYPOINT4;
+extern const char * const QD_CAPABILITY_WAYPOINT5;
+extern const char * const QD_CAPABILITY_WAYPOINT6;
+extern const char * const QD_CAPABILITY_WAYPOINT7;
+extern const char * const QD_CAPABILITY_WAYPOINT8;
+extern const char * const QD_CAPABILITY_WAYPOINT9;
/// @}
/** @name Dynamic Node Properties */
diff --git a/include/qpid/dispatch/router_core.h
b/include/qpid/dispatch/router_core.h
index a03db27..136ad6a 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -316,6 +316,17 @@ void qdr_terminus_add_capability(qdr_terminus_t *term,
const char *capability);
bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability);
/**
+ * qdr_terminus_waypoint_capability
+ *
+ * If the terminus has a waypoint capability, return the ordinal of the
+ * waypoint. If not, return zero.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return 1..9 if the terminus has waypoint capability, 0 otherwise
+ */
+int qdr_terminus_waypoint_capability(qdr_terminus_t *term);
+
+/**
* qdr_terminus_is_anonymous
*
* Indicate whether this terminus represents an anonymous endpoint.
diff --git a/python/qpid_dispatch/management/qdrouter.json
b/python/qpid_dispatch/management/qdrouter.json
index 5408c9b..180a0e0 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1868,6 +1868,13 @@
"required": false,
"create": true
},
+ "allowWaypointLinks": {
+ "type": "boolean",
+ "description": "Whether this connection is allowed to
claim 'waypoint.N' capability for attached links. This allows endpoints to act
as waypoints without needing auto-links.",
+ "default": true,
+ "required": false,
+ "create": true
+ },
"sources": {
"type": "string",
"description": "A list of source addresses from which
users in this group may receive messages. To specify multiple addresses,
separate the addresses with either a comma or a space. If you do not specify
any addresses, users in this group are not allowed to receive messages from any
addresses. You can use the substitution token '${user}' to specify an address
that contains a user's authenticated user name. You can use an asterisk ('*')
wildcard to match one or more ch [...]
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py
b/python/qpid_dispatch_internal/policy/policy_local.py
index 9a5a1d6..2f3cfb4 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -70,6 +70,7 @@ class PolicyKeys(object):
KW_ALLOW_DYNAMIC_SRC = "allowDynamicSource"
KW_ALLOW_ANONYMOUS_SENDER = "allowAnonymousSender"
KW_ALLOW_USERID_PROXY = "allowUserIdProxy"
+ KW_ALLOW_WAYPOINT_LINKS = "allowWaypointLinks"
KW_SOURCES = "sources"
KW_TARGETS = "targets"
KW_SOURCE_PATTERN = "sourcePattern"
@@ -143,6 +144,7 @@ class PolicyCompiler(object):
PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
PolicyKeys.KW_ALLOW_USERID_PROXY,
+ PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
PolicyKeys.KW_SOURCES,
PolicyKeys.KW_TARGETS,
PolicyKeys.KW_SOURCE_PATTERN,
@@ -243,6 +245,7 @@ class PolicyCompiler(object):
policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False
policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False
policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False
+ policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True
policy_out[PolicyKeys.KW_SOURCES] = ''
policy_out[PolicyKeys.KW_TARGETS] = ''
policy_out[PolicyKeys.KW_SOURCE_PATTERN] = ''
@@ -278,7 +281,8 @@ class PolicyCompiler(object):
policy_out[key] = val_out
elif key in [PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
- PolicyKeys.KW_ALLOW_USERID_PROXY
+ PolicyKeys.KW_ALLOW_USERID_PROXY,
+ PolicyKeys.KW_ALLOW_WAYPOINT_LINKS
]:
if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and
val.lower() in ['true', 'false']:
val = True if val == 'true' else False
diff --git a/src/amqp.c b/src/amqp.c
index c1a0f11..9164d60 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -32,11 +32,20 @@ const int QD_MA_MAX_KEY_LEN = 16;
const int QD_MA_N_KEYS = 4; // max number of router annotations
to send/receive
const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to
search for stripping
-const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router";
-const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data";
-const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink";
-const char * const QD_CAPABILITY_WAYPOINT1 = "qd.waypoint.1";
-const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
+const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router";
+const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data";
+const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink";
+const char * const QD_CAPABILITY_WAYPOINT_DEFAULT = "qd.waypoint";
+const char * const QD_CAPABILITY_WAYPOINT1 = "qd.waypoint.1";
+const char * const QD_CAPABILITY_WAYPOINT2 = "qd.waypoint.2";
+const char * const QD_CAPABILITY_WAYPOINT3 = "qd.waypoint.3";
+const char * const QD_CAPABILITY_WAYPOINT4 = "qd.waypoint.4";
+const char * const QD_CAPABILITY_WAYPOINT5 = "qd.waypoint.5";
+const char * const QD_CAPABILITY_WAYPOINT6 = "qd.waypoint.6";
+const char * const QD_CAPABILITY_WAYPOINT7 = "qd.waypoint.7";
+const char * const QD_CAPABILITY_WAYPOINT8 = "qd.waypoint.8";
+const char * const QD_CAPABILITY_WAYPOINT9 = "qd.waypoint.9";
+const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address";
diff --git a/src/policy.c b/src/policy.c
index 636292e..910a93b 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -437,6 +437,7 @@ bool qd_policy_open_lookup_user(
settings->allowDynamicSource =
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
}
settings->allowUserIdProxy =
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
+ settings->allowWaypointLinks =
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
if (settings->sources == 0) { //don't override if
configured by authz plugin
settings->sources =
qd_entity_get_string((qd_entity_t*)upolicy, "sources");
}
@@ -873,6 +874,24 @@ bool _qd_policy_approve_link_name_tree(const char
*username, const char *allowed
}
+static bool qd_policy_terminus_is_waypoint(pn_terminus_t *term)
+{
+ pn_data_t *cap = pn_terminus_capabilities(term);
+ if (cap) {
+ pn_data_rewind(cap);
+ pn_data_next(cap);
+ if (cap && pn_data_type(cap) == PN_SYMBOL) {
+ pn_bytes_t sym = pn_data_get_symbol(cap);
+ size_t len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT);
+ if (sym.size >= len && strncmp(sym.start,
QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0)
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t
*qd_conn)
{
const char *hostip = qd_connection_remote_ip(qd_conn);
@@ -897,6 +916,17 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t
*pn_link, qd_connection_t *qd_
bool lookup;
if (target && *target) {
// a target is specified
+ if (!qd_conn->policy_settings->allowWaypointLinks) {
+ bool waypoint =
qd_policy_terminus_is_waypoint(pn_link_remote_target(pn_link));
+ if (waypoint) {
+
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
+ "[%"PRIu64"]: DENY AMQP Attach sender link '%s' for
user '%s', rhost '%s', vhost '%s'. Waypoint capability not permitted",
+ qd_conn->connection_id, target, qd_conn->user_id,
hostip, vhost);
+ _qd_policy_deny_amqp_sender_link(pn_link, qd_conn,
QD_AMQP_COND_UNAUTHORIZED_ACCESS);
+ return false;
+ }
+ }
+
lookup = qd_policy_approve_link_name(qd_conn->user_id,
qd_conn->policy_settings, target, false);
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source,
(lookup ? QD_LOG_TRACE : QD_LOG_INFO),
@@ -959,6 +989,17 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t
*pn_link, qd_connection_t *q
const char * source =
pn_terminus_get_address(pn_link_remote_source(pn_link));
if (source && *source) {
// a source is specified
+ if (!qd_conn->policy_settings->allowWaypointLinks) {
+ bool waypoint =
qd_policy_terminus_is_waypoint(pn_link_remote_source(pn_link));
+ if (waypoint) {
+
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
+ "[%"PRIu64"]: DENY AMQP Attach receiver link '%s' for
user '%s', rhost '%s', vhost '%s'. Waypoint capability not permitted",
+ qd_conn->connection_id, source, qd_conn->user_id,
hostip, vhost);
+ _qd_policy_deny_amqp_sender_link(pn_link, qd_conn,
QD_AMQP_COND_UNAUTHORIZED_ACCESS);
+ return false;
+ }
+ }
+
bool lookup = qd_policy_approve_link_name(qd_conn->user_id,
qd_conn->policy_settings, source, true);
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source,
(lookup ? QD_LOG_TRACE : QD_LOG_INFO),
diff --git a/src/policy.h b/src/policy.h
index a1d415a..5f7af74 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -51,6 +51,7 @@ struct qd_policy__settings_s {
bool allowDynamicSource;
bool allowAnonymousSender;
bool allowUserIdProxy;
+ bool allowWaypointLinks;
char *sources;
char *targets;
char *sourcePattern;
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a810a1e..d6ced98 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -387,7 +387,7 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
int qdr_link_phase(const qdr_link_t *link)
{
- return link && link->auto_link ? link->auto_link->phase : 0;
+ return link ? link->phase : 0;
}
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c
b/src/router_core/modules/address_lookup_client/lookup_client.c
index b457304..039387b 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -252,9 +252,10 @@ static qdr_address_t
*qdr_lookup_terminus_address_CT(qdr_core_t *core,
//
// If the terminus has a waypoint capability, override the configured
phases and use the waypoint phases.
//
- if (qdr_terminus_has_capability(terminus, QD_CAPABILITY_WAYPOINT1)) {
- in_phase = 1;
- out_phase = 0;
+ int waypoint_ordinal = qdr_terminus_waypoint_capability(terminus);
+ if (waypoint_ordinal > 0) {
+ in_phase = waypoint_ordinal;
+ out_phase = waypoint_ordinal - 1;
}
qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
diff --git a/src/router_core/modules/edge_router/addr_proxy.c
b/src/router_core/modules/edge_router/addr_proxy.c
index a6c409e..c73dfc0 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -89,6 +89,41 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr)
}
+static void set_waypoint_capability(qdr_terminus_t *term, char phase_char,
qd_direction_t dir, int in_phase, int out_phase)
+{
+ int phase = (int) (phase_char - '0');
+
+ //
+ // For links that are outgoing on the in_phase or incoming on the
out_phase, don't set the
+ // waypoint capability. These links will behave like normal client links.
+ //
+ if ((dir == QD_OUTGOING && phase == in_phase) ||
+ (dir == QD_INCOMING && phase == out_phase))
+ return;
+
+ //
+ // If the phase is outside the range of in_phase..out_phase, don't do
anything. This is a
+ // misconfiguration.
+ //
+ if (phase < in_phase || phase > out_phase)
+ return;
+
+ //
+ // In all remaining cases, the new links are acting as waypoints.
+ //
+ int ordinal = phase + (dir == QD_OUTGOING ? 0 : 1);
+ char cap[16];
+ char suffix[3];
+
+ strcpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT);
+ suffix[0] = '.';
+ suffix[1] = '0' + ordinal;
+ suffix[2] = '\0';
+ strcat(cap, suffix);
+ qdr_terminus_add_capability(term, cap);
+}
+
+
static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key,
qdr_address_t *addr)
{
if (addr->edge_inlink == 0) {
@@ -96,13 +131,12 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const
char *key, qdr_address_t
if (addr->config && addr->config->out_phase > 0) {
//
- // If this address is configured as multi-phase, check to see if
it is
- // an inlink on phase-0. If so, tell the Interior peer that this
is
- // for a waypoint.
+ // If this address is configured as multi-phase, we may need to
+ // add waypoint capabilities to the terminus.
//
const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
- if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '0')
- qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1);
+ if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
+ set_waypoint_capability(term, key[1], QD_INCOMING,
addr->config->in_phase, addr->config->out_phase);
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn,
QD_LINK_ENDPOINT, QD_INCOMING,
@@ -136,13 +170,12 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const
char *key, qdr_address_
if (addr->config && addr->config->out_phase > 0) {
//
- // If this address is configured as multi-phase, check to see if
it is
- // an outlink on phase-1. If so, tell the Interior peer that this
is
- // for a waypoint.
+ // If this address is configured as multi-phase, we may need to
+ // add waypoint capabilities to the terminus.
//
const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
- if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '1')
- qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1);
+ if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
+ set_waypoint_capability(term, key[1], QD_OUTGOING,
addr->config->in_phase, addr->config->out_phase);
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn,
QD_LINK_ENDPOINT, QD_OUTGOING,
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index b843436..30cfb44 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -255,6 +255,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core,
qdr_auto_link_t *al, qdr
qdr_terminus_set_address(term, &key[2]); // truncate the "Mp"
annotation (where p = phase)
al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT,
al->dir, source, target);
al->link->auto_link = al;
+ al->link->phase = al->phase;
al->state = QDR_AUTO_LINK_STATE_ATTACHING;
}
else {
@@ -288,6 +289,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core,
qdr_auto_link_t *al, q
if (al->link) {
qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE,
true);
al->link->auto_link = 0;
+ al->link->phase = 0;
al->link = 0;
}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 3f21290..28649ad 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -453,12 +453,14 @@ void qdr_core_remove_address(qdr_core_t *core,
qdr_address_t *addr)
void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr,
qdr_link_t *link)
{
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
link->owning_addr = addr;
+ if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE))
+ link->phase = (int) (key[1] - '0');
if (link->link_direction == QD_OUTGOING) {
qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
if (DEQ_SIZE(addr->rlinks) == 1) {
- const char *key = (const char*)
qd_hash_key_by_handle(addr->hash_handle);
if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key ==
QD_ITER_HASH_PREFIX_MOBILE))
qdr_post_mobile_added_CT(core, key, addr->treatment);
qdr_addr_start_inlinks_CT(core, addr);
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 289614a..a2ebc8e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
int attach_count; ///< 1 or 2 depending on the
state of the lifecycle
int detach_count; ///< 0, 1, or 2 depending on
the state of the lifecycle
qdr_address_t *owning_addr; ///< [ref] Address record
that owns this link
+ int phase;
qdr_link_t *connected_link; ///< [ref] If this is a
link-route, reference the connected link
qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link
terminates on an in-core endpoint
qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to
containing reference objects
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 1486148..58ec9e5 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -128,6 +128,26 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term,
const char *capability)
}
+int qdr_terminus_waypoint_capability(qdr_terminus_t *term)
+{
+ pn_data_t *cap = term->capabilities;
+ pn_data_rewind(cap);
+ pn_data_next(cap);
+ if (cap && pn_data_type(cap) == PN_SYMBOL) {
+ pn_bytes_t sym = pn_data_get_symbol(cap);
+ size_t len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT);
+ if (sym.size >= len && strncmp(sym.start,
QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0) {
+ if (sym.size == len)
+ return 1; // This is the default ordinal
+ if (sym.size == len + 2 && sym.start[len + 1] > '0' &&
sym.start[len + 1] <= '9')
+ return (int) (sym.start[len + 1] - '0');
+ }
+ }
+
+ return 0;
+}
+
+
bool qdr_terminus_is_anonymous(qdr_terminus_t *term)
{
return term == 0 || (term->address == 0 && !term->dynamic);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 9c68325..6eb0547 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -126,6 +126,7 @@ foreach(py_test_module
system_tests_priority
system_tests_core_client
system_tests_address_lookup
+ system_tests_multi_phase
)
add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module})
diff --git a/tests/system_tests_multi_phase.py
b/tests/system_tests_multi_phase.py
new file mode 100644
index 0000000..27043d0
--- /dev/null
+++ b/tests/system_tests_multi_phase.py
@@ -0,0 +1,411 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+from threading import Event
+from threading import Timer
+
+import unittest2 as unittest
+from proton import Message, Timeout, symbol
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import QdManager
+from system_tests_link_routes import ConnLinkRouteService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
+import re
+
+
+class AddrTimer(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.check_address()
+
+
+class RouterTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(RouterTest, cls).setUpClass()
+
+ def router(name, mode, connection, extra=None):
+ config = [
+ ('router', {'mode': mode, 'id': name}),
+ ('listener', {'port': cls.tester.get_port(),
'stripAnnotations': 'no'}),
+ ('address', {'prefix': 'queue', 'waypoint': 'yes'}),
+ ('address', {'prefix': 'multi', 'ingressPhase': '0',
'egressPhase': '9'}),
+ connection
+ ]
+
+ if extra:
+ config.append(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+ edge_port_A = cls.tester.get_port()
+ edge_port_B = cls.tester.get_port()
+
+ router('INT.A', 'interior', ('listener', {'role': 'inter-router',
'port': inter_router_port}),
+ ('listener', {'role': 'edge', 'port': edge_port_A}))
+ router('INT.B', 'interior', ('connector', {'name': 'connectorToA',
'role': 'inter-router', 'port': inter_router_port}),
+ ('listener', {'role': 'edge', 'port': edge_port_B}))
+ router('EA1', 'edge', ('connector', {'name': 'edge', 'role':
'edge', 'port': edge_port_A}))
+ router('EA2', 'edge', ('connector', {'name': 'edge', 'role':
'edge', 'port': edge_port_A}))
+ router('EB1', 'edge', ('connector', {'name': 'edge', 'role':
'edge', 'port': edge_port_B}))
+ router('EB2', 'edge', ('connector', {'name': 'edge', 'role':
'edge', 'port': edge_port_B}))
+
+ cls.routers[0].wait_router_connected('INT.B')
+ cls.routers[1].wait_router_connected('INT.A')
+
+
+ def test_01_waypoint_same_interior(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ 'queue.01')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_02_waypoint_same_edge(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ 'queue.02')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_03_waypoint_edge_interior(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ 'queue.03')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_04_waypoint_interior_edge(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ 'queue.04')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_05_waypoint_interior_interior(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ 'queue.05')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_06_waypoint_edge_edge(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[0].addresses[0],
+ 'queue.06')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_07_waypoint_edge_endpoints_int_1(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0],
+ 'queue.07')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_08_waypoint_edge_endpoints_int_2(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[5].addresses[0],
+ 'queue.08')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_09_waypoint_int_endpoints_edge_1(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[0].addresses[0],
+ 'queue.09')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_10_waypoint_int_endpoints_edge_2(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[1].addresses[0],
+ 'queue.10')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_11_waypoint_int_endpoints_int_1(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[0].addresses[0],
+ 'queue.11')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_12_waypoint_int_endpoints_int_2(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[1].addresses[0],
+ 'queue.12')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_13_waypoint_edge_endpoints_edge_1(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[3].addresses[0],
+ 'queue.13')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_14_waypoint_edge_endpoints_edge_2(self):
+ test = WaypointTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[4].addresses[0],
+ 'queue.14')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_15_multiphase_1(self):
+ test = MultiPhaseTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ [
+ self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0]
+ ],
+ 'multi.15')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_16_multiphase_2(self):
+ test = MultiPhaseTest(self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ [
+ self.routers[5].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0]
+ ],
+ 'multi.16')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_17_multiphase_3(self):
+ test = MultiPhaseTest(self.routers[1].addresses[0],
+ self.routers[0].addresses[0],
+ [
+ self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0]
+ ],
+ 'multi.17')
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
+class WaypointTest(MessagingHandler):
+ def __init__(self, sender_host, receiver_host, waypoint_host, addr):
+ super(WaypointTest, self).__init__()
+ self.sender_host = sender_host
+ self.receiver_host = receiver_host
+ self.waypoint_host = waypoint_host
+ self.addr = addr
+ self.count = 300
+
+ self.sender_conn = None
+ self.receiver_conn = None
+ self.waypoint_conn = None
+ self.error = None
+ self.n_tx = 0
+ self.n_rx = 0
+ self.n_thru = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%d" %
(self.n_tx, self.n_rx, self.n_thru)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.waypoint_conn.close()
+
+ def fail(self, error):
+ self.error = error
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.waypoint_conn.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver_conn = event.container.connect(self.receiver_host)
+ self.waypoint_conn = event.container.connect(self.waypoint_host)
+ self.sender = event.container.create_sender(self.sender_conn,
self.addr)
+ self.receiver =
event.container.create_receiver(self.receiver_conn, self.addr)
+ self.wp_sender =
event.container.create_sender(self.waypoint_conn, self.addr)
+ self.wp_receiver =
event.container.create_receiver(self.waypoint_conn, self.addr)
+ self.wp_sender.target.capabilities.put_object(symbol("qd.waypoint"))
+ self.wp_receiver.source.capabilities.put_object(symbol("qd.waypoint"))
+
+ def on_sendable(self, event):
+ if event.sender == self.sender:
+ while self.sender.credit > 0 and self.n_tx < self.count:
+ self.sender.send(Message("Message %d" % self.n_tx))
+ self.n_tx += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_rx += 1
+ if self.n_rx == self.count and self.n_thru == self.count:
+ self.fail(None)
+ elif event.receiver == self.wp_receiver:
+ self.n_thru += 1
+ self.wp_sender.send(Message(event.message.body))
+
+ def run(self):
+ Container(self).run()
+
+
+class MultiPhaseTest(MessagingHandler):
+ def __init__(self, sender_host, receiver_host, waypoint_hosts, addr):
+ super(MultiPhaseTest, self).__init__()
+ self.sender_host = sender_host
+ self.receiver_host = receiver_host
+ self.waypoint_hosts = waypoint_hosts
+ self.addr = addr
+ self.count = 300
+
+ self.sender_conn = None
+ self.receiver_conn = None
+ self.waypoint_conns = []
+ self.wp_senders = []
+ self.wp_receivers = []
+ self.error = None
+ self.n_tx = 0
+ self.n_rx = 0
+ self.n_thru = [0,0,0,0,0,0,0,0,0]
+
+ def timeout(self):
+ self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%r" %
(self.n_tx, self.n_rx, self.n_thru)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ for c in self.waypoint_conns:
+ c.close()
+
+ def fail(self, error):
+ self.error = error
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ for c in self.waypoint_conns:
+ c.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver_conn = event.container.connect(self.receiver_host)
+ self.sender = event.container.create_sender(self.sender_conn,
self.addr)
+ self.receiver =
event.container.create_receiver(self.receiver_conn, self.addr)
+ for host in self.waypoint_hosts:
+ self.waypoint_conns.append(event.container.connect(host))
+
+ ordinal = 1
+ for conn in self.waypoint_conns:
+ sender = event.container.create_sender(conn, self.addr)
+ receiver = event.container.create_receiver(conn, self.addr)
+
+ sender.target.capabilities.put_object(symbol("qd.waypoint.%d" %
ordinal))
+ receiver.source.capabilities.put_object(symbol("qd.waypoint.%d" %
ordinal))
+
+ self.wp_senders.append(sender)
+ self.wp_receivers.append(receiver)
+ ordinal += 1
+
+ def on_sendable(self, event):
+ if event.sender == self.sender:
+ while self.sender.credit > 0 and self.n_tx < self.count:
+ self.sender.send(Message("Message %d" % self.n_tx))
+ self.n_tx += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_rx += 1
+ if self.n_rx == self.count:
+ self.fail(None)
+ else:
+ idx = 0
+ for receiver in self.wp_receivers:
+ if event.receiver == receiver:
+ self.n_thru[idx] += 1
+ self.wp_senders[idx].send(Message(event.message.body))
+ return
+ idx += 1
+
+ def run(self):
+ Container(self).run()
+
+
+if __name__== '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]