Repository: qpid-dispatch
Updated Branches:
  refs/heads/master db87d5d3e -> 564c5907c


DISPATCH-1194: Link Route address lookup for edge router
This closes #423


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d201deac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d201deac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d201deac

Branch: refs/heads/master
Commit: d201deac4ec8854571dcd3acd1aa5aa39d16de5b
Parents: db87d5d
Author: Kenneth Giusti <[email protected]>
Authored: Wed Nov 28 09:30:09 2018 -0500
Committer: Ted Ross <[email protected]>
Committed: Tue Dec 4 09:34:47 2018 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/amqp.h                    |   5 +
 src/CMakeLists.txt                              |   1 +
 .../modules/address_lookup/address_lookup.c     | 454 +++++++++++++++++++
 .../modules/address_lookup/address_lookup.h     |  75 +++
 src/router_core/router_core_private.h           |   2 +-
 src/router_core/transfer.c                      |  45 +-
 tests/CMakeLists.txt                            |   1 +
 tests/system_test.py                            |   9 +-
 tests/system_tests_address_lookup.py            | 287 ++++++++++++
 9 files changed, 857 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index e62b7f7..99315c7 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -184,5 +184,10 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
 extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
 /// @};
 
+/** @name AMQP link endpoint role. */
+/// @{
+#define QD_AMQP_LINK_ROLE_SENDER   false
+#define QD_AMQP_LINK_ROLE_RECEIVER true
+/// @};
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ebc0e09..352aec7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -109,6 +109,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/edge_router/edge_mgmt.c
   router_core/modules/test_hooks/core_test_hooks.c
   router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+  router_core/modules/address_lookup/address_lookup.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.c 
b/src/router_core/modules/address_lookup/address_lookup.c
new file mode 100644
index 0000000..93297ac
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.c
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "address_lookup.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+
+#include "inttypes.h"
+
+#define PROTOCOL_VERSION 1
+
+typedef enum {
+    // note: keep unit test in sync
+    OPCODE_INVALID,
+    OPCODE_LINK_ROUTE_LOOKUP,
+} address_lookup_opcode_t;
+
+
+/* create the message application properties and body for the link route lookup
+ * request message
+ */
+int qcm_link_route_lookup_msg(qd_iterator_t        *address,
+                              qd_direction_t        dir,
+                              qd_composed_field_t **properties,
+                              qd_composed_field_t **body)
+{
+    *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+    if (!*properties)
+        return -1;
+    qd_compose_start_map(*properties);
+    qd_compose_insert_string(*properties, "version");
+    qd_compose_insert_uint(*properties,   PROTOCOL_VERSION);
+    qd_compose_insert_string(*properties, "opcode");
+    qd_compose_insert_uint(*properties,   OPCODE_LINK_ROUTE_LOOKUP);
+    qd_compose_end_map(*properties);
+
+    *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    if (!*body) {
+        qd_compose_free(*properties);
+        *properties = 0;
+        return -1;
+    }
+    qd_compose_start_list(*body);
+    qd_compose_insert_string_iterator(*body, address);
+    qd_compose_insert_bool(*body, (dir == QD_INCOMING
+                                   ? QD_AMQP_LINK_ROLE_RECEIVER
+                                   : QD_AMQP_LINK_ROLE_SENDER));
+    qd_compose_end_list(*body);
+    return 0;
+}
+
+
+/* parse a reply to the link route lookup request
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t 
*properties,
+                                                         qd_iterator_t *body,
+                                                         bool          
*is_link_route,
+                                                         bool          
*has_destinations)
+{
+    qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK;
+    *is_link_route = false;
+    *has_destinations = false;
+
+    qd_parsed_field_t *props = qd_parse(properties);
+    if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props))
+        return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+
+    qd_parsed_field_t *bod = qd_parse(body);
+    if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) {
+        qd_parse_free(props);
+        return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+    }
+
+    qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status");
+    if (!tmp || !qd_parse_is_scalar(tmp)) {
+        rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+        goto exit;
+    } else {
+        int32_t status = qd_parse_as_int(tmp);
+        if (status != QCM_ADDR_LOOKUP_OK) {
+            rc = (qcm_address_lookup_status_t) status;
+            goto exit;
+        }
+    }
+
+    // bod[0] == is_link_route (bool)
+    // bod[1] == has_destinations (bool)
+
+    if (qd_parse_sub_count(bod) < 2) {
+        rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+        goto exit;
+    }
+
+    *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0));
+    *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1));
+
+exit:
+    qd_parse_free(props);
+    qd_parse_free(bod);
+    return rc;
+}
+
+
+typedef struct _endpoint_ref {
+    DEQ_LINKS(struct _endpoint_ref);
+    qdrc_endpoint_t *endpoint;
+    const char *container_id;
+} _endpoint_ref_t;
+DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t);
+ALLOC_DEFINE(_endpoint_ref_t);
+
+
+static struct {
+    qdr_core_t           *core;
+    _endpoint_ref_list_t  endpoints;
+} _server_state;
+
+
+/* parse out the opcode from the request
+ */
+static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties)
+{
+    if (!properties)
+        return OPCODE_INVALID;
+    qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode");
+    if (!oc)
+        return OPCODE_INVALID;
+    uint32_t opcode = qd_parse_as_uint(oc);
+    if (!qd_parse_ok(oc))
+        return OPCODE_INVALID;
+    return (address_lookup_opcode_t)opcode;
+}
+
+
+/* send a reply to a lookup request
+ */
+static uint64_t _send_reply(_endpoint_ref_t             *epr,
+                            address_lookup_opcode_t      opcode,
+                            qcm_address_lookup_status_t  status,
+                            qd_iterator_t               *correlation_id,
+                            qd_iterator_t               *reply_to,
+                            qd_composed_field_t         *body)
+{
+    if (!correlation_id || !reply_to) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Link route address reply failed - invalid request message 
properties"
+               " (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        return PN_REJECTED;
+    }
+
+    qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(fld);
+    qd_compose_insert_bool(fld, 0);     // durable
+    qd_compose_end_list(fld);
+
+    fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
+    qd_compose_start_list(fld);
+    qd_compose_insert_null(fld);                    // message-id
+    qd_compose_insert_null(fld);                    // user-id
+    qd_compose_insert_typed_iterator(fld, reply_to); // to
+    qd_compose_insert_null(fld);                    // subject
+    qd_compose_insert_null(fld);                    // reply-to
+    qd_compose_insert_typed_iterator(fld, correlation_id);
+    qd_compose_end_list(fld);
+
+    fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld);
+    qd_compose_start_map(fld);
+    qd_compose_insert_string(fld, "version");
+    qd_compose_insert_uint(fld,   PROTOCOL_VERSION);
+    qd_compose_insert_string(fld, "opcode");
+    qd_compose_insert_uint(fld,   opcode);
+    qd_compose_insert_string(fld, "status");
+    qd_compose_insert_uint(fld,   status);
+    qd_compose_end_map(fld);
+
+    qd_message_t *msg = qd_message();
+
+    qd_message_compose_3(msg, fld, body);
+    qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
+    qd_message_free(msg);
+    qd_compose_free(fld);
+
+    return PN_ACCEPTED;
+}
+
+
+/* perform a link route lookup
+ */
+static uint64_t _do_link_route_lookup(_endpoint_ref_t   *epr,
+                                      qd_parsed_field_t *body,
+                                      qd_iterator_t     *reply_to,
+                                      qd_iterator_t     *cid)
+{
+    if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Link route address lookup failed - invalid request body"
+               " (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        return PN_REJECTED;
+    }
+
+    //
+    // body[0] == fully qualified address (string)
+    // body[1] == direction (bool, true == receiver)
+    //
+
+    qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0));
+    qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1))
+                          ? QD_INCOMING : QD_OUTGOING);
+
+    bool is_link_route = false;
+    bool has_destinations = false;
+    qdr_address_t *addr = 0;
+    qd_iterator_reset_view(addr_i, ITER_VIEW_ALL);
+    qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], 
addr_i, (void**) &addr);
+    if (addr) {
+        is_link_route = true;
+        has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) 
|| qd_bitmask_cardinality(addr->rnodes));
+    }
+
+    // out_body[0] == is_link_route (bool)
+    // out_body[1] == has_destinations (bool)
+
+    qd_composed_field_t *out_body = 
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    qd_compose_start_list(out_body);
+    qd_compose_insert_bool(out_body, is_link_route);
+    qd_compose_insert_bool(out_body, has_destinations);
+    qd_compose_end_list(out_body);
+
+    uint64_t rc = _send_reply(epr,
+                              OPCODE_LINK_ROUTE_LOOKUP,
+                              addr ? QCM_ADDR_LOOKUP_OK : 
QCM_ADDR_LOOKUP_NOT_FOUND,
+                              cid,
+                              reply_to,
+                              out_body);
+    qd_compose_free(out_body);
+
+    if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
+        char *as = (char *)qd_iterator_copy(addr_i);
+        qd_log(_server_state.core->log, QD_LOG_TRACE,
+               "Link route address lookup on %s - %sfound is link route=%s 
has_destinations=%s"
+               " (container=%s, endpoint=%p)",
+               as,
+               (addr) ? "" : "not ",
+               is_link_route ? "yes" : "no",
+               has_destinations ? "yes" : "no",
+               epr->container_id,
+               (void *)epr->endpoint);
+        free(as);
+    }
+    return rc;
+}
+
+
+/* handle lookup request from client
+ */
+void _on_transfer(void           *link_context,
+                  qdr_delivery_t *delivery,
+                  qd_message_t   *message)
+{
+    if (!qd_message_receive_complete(message))
+        return;
+
+    _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Address lookup request received (container=%s, endpoint=%p)",
+           epr->container_id, (void *)epr->endpoint);
+
+    uint64_t disposition = PN_ACCEPTED;
+    qd_iterator_t *p_iter = qd_message_field_iterator(message, 
QD_FIELD_APPLICATION_PROPERTIES);
+    qd_parsed_field_t *props = qd_parse(p_iter);
+    if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - no properties (container=%s, 
endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    qd_parsed_field_t *v = qd_parse_value_by_key(props, "version");
+    if (!v) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - no version (container=%s, 
endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    uint32_t version = qd_parse_as_uint(v);
+    if (!qd_parse_ok(v)) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - invalid version 
(container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    if (version != PROTOCOL_VERSION) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - unknown version"
+               " (container=%s, endpoint=%p, version=%"PRIu32")",
+               epr->container_id, (void *)epr->endpoint, version);
+        disposition = PN_REJECTED;
+        goto exit;
+        // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION
+    }
+
+    address_lookup_opcode_t opcode = _decode_opcode(props);
+    switch (opcode) {
+    case OPCODE_LINK_ROUTE_LOOKUP: {
+        qd_iterator_t *b_iter = qd_message_field_iterator(message, 
QD_FIELD_BODY);
+        qd_parsed_field_t *body = qd_parse(b_iter);
+        qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, 
QD_FIELD_REPLY_TO);
+        qd_iterator_t *cid = qd_message_field_iterator_typed(message, 
QD_FIELD_CORRELATION_ID);
+        disposition = _do_link_route_lookup(epr, body, reply_to, cid);
+        qd_iterator_free(cid);
+        qd_iterator_free(reply_to);
+        qd_parse_free(body);
+        qd_iterator_free(b_iter);
+        break;
+    }
+    case OPCODE_INVALID:
+    default:
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - invalid opcode"
+               " (container=%s, endpoint=%p, opcode=%d)",
+               epr->container_id, (void *)epr->endpoint, opcode);
+        disposition = PN_REJECTED;
+    }
+
+exit:
+    qd_parse_free(props);
+    qd_iterator_free(p_iter);
+    qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition);
+    qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
+    return;
+}
+
+
+/* handle incoming attach to address lookup service
+ */
+static void _on_first_attach(void            *bind_context,
+                             qdrc_endpoint_t *endpoint,
+                             void            **link_context,
+                             qdr_terminus_t  *remote_source,
+                             qdr_terminus_t  *remote_target)
+{
+    //
+    // Only accept incoming links initiated by the edge router. Detach all
+    // other links
+    //
+    qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint);
+    if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING ||
+        conn->role != QDR_ROLE_EDGE_CONNECTION) {
+        *link_context = 0;
+        qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0);
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Attempt to attach to address lookup server rejected 
(container=%s)",
+               (conn->connection_info) ? conn->connection_info->container : 
"<unknown>");
+        return;
+    }
+
+    _endpoint_ref_t *epr = new__endpoint_ref_t();
+    ZERO(epr);
+    epr->endpoint = endpoint;
+    epr->container_id = (conn->connection_info) ? 
conn->connection_info->container : "<unknown>";
+    DEQ_INSERT_TAIL(_server_state.endpoints, epr);
+    *link_context = epr;
+    qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, 
remote_source, remote_target);
+    qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false);
+
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Client attached to address lookup server (container=%s, 
endpoint=%p)",
+           epr->container_id, (void *)endpoint);
+}
+
+
+/* handle incoming detach from client
+ */
+static void _on_first_detach(void *link_context,
+                             qdr_error_t *error)
+{
+    _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+    qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0);
+    DEQ_REMOVE(_server_state.endpoints, epr);
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Client detached from address lookup server (container=%s, 
endpoint=%p)",
+           epr->container_id, (void *)epr->endpoint);
+    free__endpoint_ref_t(epr);
+}
+
+
+static qdrc_endpoint_desc_t _endpoint_handlers =
+{
+    .label = "address lookup",
+    .on_first_attach = _on_first_attach,
+    .on_transfer = _on_transfer,
+    .on_first_detach = _on_first_detach,
+};
+
+
+static void _address_lookup_init_CT(qdr_core_t *core, void **module_context)
+{
+    //
+    // Address resolution service is provided by interior routers only
+    //
+    if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+        return;
+
+    _server_state.core = core;
+
+    //
+    // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address
+    //
+    qdrc_endpoint_bind_mobile_address_CT(core,
+                                         QD_TERMINUS_ADDRESS_LOOKUP,
+                                         '0', // phase
+                                         &_endpoint_handlers,
+                                         &_server_state);
+    *module_context = &_server_state;
+}
+
+
+static void _address_lookup_final_CT(void *module_context)
+{
+    _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
+    while (epr) {
+        DEQ_REMOVE_HEAD(_server_state.endpoints);
+        free__endpoint_ref_t(epr);
+        epr = DEQ_HEAD(_server_state.endpoints);
+    }
+}
+
+
+QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, 
_address_lookup_final_CT)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.h 
b/src/router_core/modules/address_lookup/address_lookup.h
new file mode 100644
index 0000000..9f8b65e
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.h
@@ -0,0 +1,75 @@
+#ifndef router_core_address_lookup_h
+#define router_core_address_lookup_h 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/compose.h>
+//
+// API for building address lookup request messages.  The message properties
+// and body fields are handled separately so they can be passed directly to the
+// core client API.
+//
+
+typedef enum {
+    // note: keep unit test in sync
+    QCM_ADDR_LOOKUP_OK,
+    QCM_ADDR_LOOKUP_BAD_VERSION,
+    QCM_ADDR_LOOKUP_BAD_OPCODE,
+    QCM_ADDR_LOOKUP_NOT_FOUND,
+    QCM_ADDR_LOOKUP_INVALID_REQUEST,
+} qcm_address_lookup_status_t;
+
+
+/**
+ * Create the message properties and body for a link route address lookup.  The
+ * returned properties and body can be passed directly to
+ * qdrc_client_request_CT().
+ *
+ * @param address - fully qualified link route address to look up.
+ * @param dir - QD_INCOMING or QD_OUTGOING
+ * @param properties - return value for message application properties section
+ * @param body - return value for message body
+ * @return zero on success
+ */
+int qcm_link_route_lookup_request(qd_iterator_t        *address,
+                                  qd_direction_t        dir,
+                                  qd_composed_field_t **properties,
+                                  qd_composed_field_t **body);
+
+
+/**
+ * Parse out the payload of the link route lookup reply message.  The
+ * properties and body fields are provided by the on_reply_cb() callback passed
+ * to the qdrc_client_request_CT() call.
+ *
+ * @param properties - application properties as returned in the reply
+ * @param body - body from reply message
+ * @param is_link_route - set True if the address is a link route address that
+ *        exists in the route tables of the queried router.
+ * @param has_destination - if is_link_route this indicates whether or not the
+ *        queried router has active destinations for this link route.
+ * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t 
*properties,
+                                                         qd_iterator_t *body,
+                                                         bool          
*is_link_route,
+                                                         bool          
*has_destinations);
+#endif // router_core_address_lookup_h

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 91d0016..0b70c97 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -857,7 +857,7 @@ void qdr_delivery_failed_CT(qdr_core_t *core, 
qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const 
char *label);
 void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, 
qdr_link_t *link, qd_message_t *msg);
-
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, 
qd_message_t *msg, bool exclude_inprocess, bool control);
 /**
  * Links the in_dlv to the out_dlv and increments ref counts of both deliveries
  */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6d9206d..1cce8de 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1061,27 +1061,36 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
 
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
 {
-    qdr_field_t  *addr_field = action->args.io.address;
-    qd_message_t *msg        = action->args.io.message;
-
     if (!discard) {
-        qdr_address_t *addr = 0;
-
-        qd_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
-        qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) 
&addr);
-        if (addr) {
-            //
-            // Forward the message.  We don't care what the fanout count is.
-            //
-            (void) qdr_forward_message_CT(core, addr, msg, 0, 
action->args.io.exclude_inprocess,
-                                          action->args.io.control);
-            addr->deliveries_from_container++;
-        } else
-            qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown 
address");
+        qdr_in_process_send_to_CT(core,
+                                  qdr_field_iterator(action->args.io.address),
+                                  action->args.io.message,
+                                  action->args.io.exclude_inprocess,
+                                  action->args.io.control);
     }
 
-    qdr_field_free(addr_field);
-    qd_message_free(msg);
+    qdr_field_free(action->args.io.address);
+    qd_message_free(action->args.io.message);
+}
+
+
+/**
+ * forward an in-process message based on the destination address
+ */
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, 
qd_message_t *msg, bool exclude_inprocess, bool control)
+{
+    qdr_address_t *addr = 0;
+
+    qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+    qd_hash_retrieve(core->addr_hash, address, (void**) &addr);
+    if (addr) {
+        //
+        // Forward the message.  We don't care what the fanout count is.
+        //
+        (void) qdr_forward_message_CT(core, addr, msg, 0, exclude_inprocess, 
control);
+        addr->deliveries_from_container++;
+    } else
+        qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown 
address");
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 51dc438..45206e8 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -123,6 +123,7 @@ foreach(py_test_module
     ${CONSOLE_TEST}
     system_tests_priority
     system_tests_core_client
+    system_tests_address_lookup
     )
 
   add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module})

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index d268671..d4cd362 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -436,11 +436,12 @@ class Qdrouterd(Process):
         except:
             return False
 
-    def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs):
+    def wait_address(self, address, subscribers=0, remotes=0, containers=0, 
**retry_kwargs):
         """
         Wait for an address to be visible on the router.
         @keyword subscribers: Wait till subscriberCount >= subscribers
         @keyword remotes: Wait till remoteCount >= remotes
+        @keyword containers: Wait till containerCount >= remotes
         @param retry_kwargs: keyword args for L{retry}
         """
         def check():
@@ -449,11 +450,13 @@ class Qdrouterd(Process):
             # endswith check is because of M0/L/R prefixes
             addrs = self.management.query(
                 type='org.apache.qpid.dispatch.router.address',
-                attribute_names=[u'name', u'subscriberCount', 
u'remoteCount']).get_entities()
+                attribute_names=[u'name', u'subscriberCount', u'remoteCount', 
u'containerCount']).get_entities()
 
             addrs = [a for a in addrs if a['name'].endswith(address)]
 
-            return addrs and addrs[0]['subscriberCount'] >= subscribers and 
addrs[0]['remoteCount'] >= remotes
+            return (addrs and addrs[0]['subscriberCount'] >= subscribers
+                    and addrs[0]['remoteCount'] >= remotes
+                    and addrs[0]['containerCount'] >= containers)
         assert retry(check, **retry_kwargs)
 
     def get_host(self, protocol_family):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_tests_address_lookup.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_address_lookup.py 
b/tests/system_tests_address_lookup.py
new file mode 100644
index 0000000..3538f93
--- /dev/null
+++ b/tests/system_tests_address_lookup.py
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest2 as unittest
+import time
+
+from system_test import TestCase, Qdrouterd, TIMEOUT
+from system_test import AsyncTestReceiver
+from test_broker import FakeBroker
+
+from proton import Disposition
+from proton import Message
+from proton.utils import BlockingConnection
+from proton.utils import SyncRequestResponse
+from proton.utils import SendException
+from proton.utils import LinkDetached
+
+
+class LinkRouteLookupTest(TestCase):
+    """
+    Tests link route address lookup
+    """
+    # hardcoded values from the router's C code
+    QD_TERMINUS_ADDRESS_LOOKUP = '_$qd.addr_lookup'
+    PROTOCOL_VERSION = 1
+    OPCODE_LINK_ROUTE_LOOKUP = 1
+    QCM_ADDR_LOOKUP_OK = 0
+    QCM_ADDR_LOOKUP_NOT_FOUND = 3
+
+    def _check_response(self, message):
+        self.assertTrue(isinstance(message.properties, dict))
+        self.assertEqual(self.PROTOCOL_VERSION, 
message.properties.get('version'))
+        self.assertTrue(message.properties.get('opcode') is not None)
+        self.assertTrue(isinstance(message.body, list))
+        self.assertEqual(2, len(message.body))
+        return (message.properties.get('status'),
+                message.body[0],  # is link_route?
+                message.body[1])  # has destinations?
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(LinkRouteLookupTest, cls).setUpClass()
+
+        def router(name, mode, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': 
cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'})
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        broker_port_A     = cls.tester.get_port()
+        broker_port_B     = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               [
+                   ('listener', {'role': 'edge',
+                                 'port': cls.tester.get_port()}),
+                   ('listener', {'role': 'inter-router',
+                                 'port': inter_router_port}),
+                   ('connector', {'role': 'route-container',
+                                  'port': cls.tester.get_port()}),
+
+                   ('linkRoute', {'pattern': 'org.apache.A.#',
+                                  'containerId': 'FakeBrokerA',
+                                  'direction': 'in'}),
+                   ('linkRoute', {'pattern': 'org.apache.A.#',
+                                  'containerId': 'FakeBrokerA',
+                                  'direction': 'out'})
+               ])
+        cls.INT_A = cls.routers[-1]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+        cls.INT_A.edge_listener = cls.INT_A.addresses[1]
+        cls.INT_A.inter_router_listener = cls.INT_A.addresses[2]
+        cls.INT_A.broker_connector = cls.INT_A.connector_addresses[0]
+
+        router('INT.B', 'interior',
+               [
+                   ('listener', {'role': 'edge',
+                                 'port': cls.tester.get_port()}),
+                   ('connector', {'role': 'inter-router',
+                                  'name': 'connectorToA',
+                                  'port': inter_router_port}),
+                   ('connector', {'role': 'route-container',
+                                  'port': cls.tester.get_port()}),
+
+                   ('linkRoute', {'pattern': 'org.apache.B.#',
+                                  'containerId': 'FakeBrokerB',
+                                  'direction': 'in'}),
+                   ('linkRoute', {'pattern': 'org.apache.B.#',
+                                  'containerId': 'FakeBrokerB',
+                                  'direction': 'out'})
+                ])
+        cls.INT_B = cls.routers[-1]
+        cls.INT_B.edge_listener = cls.INT_B.addresses[1]
+        cls.INT_B.broker_connector = cls.INT_B.connector_addresses[1]
+
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+
+    def _lookup_request(self, lr_address, direction):
+        """
+        Construct a link route lookup request message
+        """
+        return Message(body=[lr_address,
+                             direction],
+                       properties={"version": self.PROTOCOL_VERSION,
+                                   "opcode": self.OPCODE_LINK_ROUTE_LOOKUP})
+
+    def test_link_route_lookup_ok(self):
+        """
+        verify a link route address can be looked up successfully for both
+        locally attached and remotely attached containers
+        """
+
+        # fire up a fake broker attached to the router local to the edge router
+        fb = FakeBroker(self.INT_A.broker_connector, 
container_id='FakeBrokerA')
+        self.INT_A.wait_address("org.apache.A", containers=1)
+
+        # create a fake edge and lookup the target address
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        for direction in [True, False]:
+            # True = direction inbound (receiver) False = direction outbound 
(sender)
+            rsp = 
self._check_response(srr.call(self._lookup_request("org.apache.A.foo", 
direction)))
+            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+            self.assertTrue(rsp[1])
+            self.assertTrue(rsp[2])
+
+        # shutdown fake router
+        fb.join()
+
+        # now fire up a fake broker attached to the remote router
+        fb = FakeBroker(self.INT_B.broker_connector, 
container_id='FakeBrokerB')
+        self.INT_A.wait_address("org.apache.B", remotes=1)
+
+        for direction in [True, False]:
+            rsp = 
self._check_response(srr.call(self._lookup_request("org.apache.B.foo", 
direction)))
+            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+            self.assertTrue(rsp[1])
+            self.assertTrue(rsp[2])
+
+        fb.join()
+        bc.close()
+
+    def test_link_route_lookup_not_found(self):
+        """
+        verify correct handling of lookup misses
+        """
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        rsp = 
self._check_response(srr.call(self._lookup_request("not.found.address", True)))
+        self.assertEqual(self.QCM_ADDR_LOOKUP_NOT_FOUND, rsp[0])
+
+    def test_link_route_lookup_not_link_route(self):
+        """
+        verify correct handling of matches to mobile addresses
+        """
+        addr = "not.a.linkroute"
+        client = AsyncTestReceiver(self.INT_A.listener, addr)
+        self.INT_A.wait_address(addr)
+
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        rsp = self._check_response(srr.call(self._lookup_request(addr, True)))
+        # self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+        self.assertEqual(False, rsp[1])
+        bc.close()
+        client.stop()
+
+    def test_link_route_lookup_no_dest(self):
+        """
+        verify correct handling of matches to mobile addresses
+        """
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        rsp = 
self._check_response(srr.call(self._lookup_request("org.apache.A.nope", True)))
+        self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+        self.assertEqual(True, rsp[1])
+        self.assertEqual(False, rsp[2])
+        bc.close()
+
+    def _invalid_request_test(self, msg, disposition=Disposition.REJECTED):
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        # @TODO(kgiusti) - self.assertRaises does not work here:
+        try:
+            srr.call(msg)
+            self.assertTrue(False)
+        except SendException as exc:
+            self.assertEqual(disposition, exc.state)
+        bc.close()
+
+    def test_link_route_invalid_request(self):
+        """
+        Test various invalid message content
+        """
+
+        # empty message
+        self._invalid_request_test(Message())
+
+        # missing properties
+        msg = self._lookup_request("ignore", False)
+        msg.properties = None
+        self._invalid_request_test(msg)
+
+        # missing version
+        msg = self._lookup_request("ignore", False)
+        del msg.properties['version']
+        self._invalid_request_test(msg)
+
+        # invalid version
+        msg = self._lookup_request("ignore", False)
+        msg.properties['version'] = "blech"
+        self._invalid_request_test(msg)
+
+        # unsupported version
+        msg = self._lookup_request("ignore", False)
+        msg.properties['version'] = 97387187
+        self._invalid_request_test(msg)
+
+        # missing opcode
+        msg = self._lookup_request("ignore", False)
+        del msg.properties['opcode']
+        self._invalid_request_test(msg)
+
+        # bad opcode
+        msg = self._lookup_request("ignore", False)
+        msg.properties['opcode'] = "snarf"
+        self._invalid_request_test(msg)
+
+        # bad body
+        msg = self._lookup_request("ignore", False)
+        msg.body = [71]
+        self._invalid_request_test(msg)
+
+    def test_lookup_bad_connection(self):
+        """
+        Verify that clients connected via non-edge connections fail
+        """
+        bc = BlockingConnection(self.INT_A.listener, timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()
+
+        bc = BlockingConnection(self.INT_A.inter_router_listener, 
timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()
+
+        # consuming from the lookup address is forbidden:
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            rcv = bc.create_receiver(self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to