Github user ganeshmurthy commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/244#discussion_r161883427
  
    --- Diff: src/router_core/exchange_bindings.c ---
    @@ -0,0 +1,1336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +#include <qpid/dispatch/ctools.h>
    +#include <inttypes.h>
    +#include <stdio.h>
    +#include "router_core_private.h"
    +#include "exchange_bindings.h"
    +
    +
    +// next_hop_t
    +// Describes the destination of a forwarded message
    +// May be shared by different bindings
    +//
    +typedef struct next_hop_t next_hop_t;
    +struct next_hop_t
    +{
    +    // per-exchange list of all next hops
    +    DEQ_LINKS_N(exchange_list, next_hop_t);
    +    // when hooked to the transmit list
    +    DEQ_LINKS_N(transmit_list, next_hop_t);
    +
    +    int                  ref_count;  // binding references
    +    int                  phase;
    +    bool                 on_xmit_list;
    +    qdr_exchange_t      *exchange;
    +    qd_iterator_t       *next_hop;
    +    unsigned char       *next_hop_str;
    +    qdr_address_t       *qdr_addr;
    +};
    +
    +ALLOC_DECLARE(next_hop_t);
    +ALLOC_DEFINE(next_hop_t);
    +DEQ_DECLARE(next_hop_t, next_hop_list_t);
    +
    +// qdr_binding_t
    +// Represents a subject key --> next hop mapping
    +// A binding is uniquely identified by the tuple (pattern, nextHop, 
phase).  No
    +// two bindings with the same tuple value can exist on an exchange.
    +// The binding is implemented using two classes: qdr_binding_t and
    +// next_hop_t. The qdr_binding_t holds the pattern and points to the
    +// next_hop_t.  This allows different patterns to share the same nextHop.
    +// Since there is only one next_hop_t instance for each (nextHop, phase) 
value,
    +// we guarantee only 1 copy of a message is forwarded to a given 
nextHop+phase
    +// even if multiple distinct patterns are matched. Ex: a message with a
    +// value of "a.b" will match two distict binding keys "+.b" and "a.+".  If
    +// both these patterns share the same next_hop_t only 1 copy of the message
    +// will be forwarded.
    +typedef struct qdr_binding qdr_binding_t;
    +struct qdr_binding
    +{
    +    // per-exchange list of all bindings
    +    DEQ_LINKS_N(exchange_list, qdr_binding_t);
    +    // parse tree node's list of bindings sharing the same pattern
    +    DEQ_LINKS_N(tree_list, qdr_binding_t);
    +
    +    qd_iterator_t       *name;
    +    unsigned char       *name_str;
    +    uint64_t             identity;
    +    qdr_exchange_t      *exchange;
    +
    +    // the routing key
    +    qd_iterator_t       *key;
    +    unsigned char       *key_str;
    +    next_hop_t          *next_hop;
    +
    +    uint64_t             msgs_matched;
    +};
    +
    +ALLOC_DECLARE(qdr_binding_t);
    +ALLOC_DEFINE(qdr_binding_t);
    +DEQ_DECLARE(qdr_binding_t, qdr_binding_list_t);
    +
    +
    +struct qdr_exchange {
    +    DEQ_LINKS(qdr_exchange_t);          // for core->exchanges
    +    qdr_core_t         *core;
    +    uint64_t            identity;
    +    qd_iterator_t      *name;
    +    unsigned char      *name_str;
    +    qd_iterator_t      *address;
    +    unsigned char      *address_str;
    +    int                 phase;
    +    qd_parse_tree_t    *parse_tree;
    +    qdr_address_t      *qdr_addr;
    +    next_hop_t         *alternate;
    +    qdr_binding_list_t  bindings;
    +    next_hop_list_t     next_hops;
    +
    +    uint64_t msgs_received;
    +    uint64_t msgs_dropped;
    +    uint64_t msgs_routed;
    +    uint64_t msgs_alternate;
    +};
    +
    +ALLOC_DECLARE(qdr_exchange_t);
    +ALLOC_DEFINE(qdr_exchange_t);
    +
    +static void qdr_exchange_free(qdr_exchange_t *ex);
    +static qdr_exchange_t *qdr_exchange(qdr_core_t    *core,
    +                                    qd_iterator_t *name,
    +                                    qd_iterator_t *address,
    +                                    int            phase,
    +                                    qd_iterator_t *alternate,
    +                                    int            alt_phase,
    +                                    qd_parse_tree_type_t method);
    +static void write_config_exchange_map(qdr_exchange_t      *ex,
    +                                      qd_composed_field_t *body);
    +static qdr_exchange_t *find_exchange(qdr_core_t    *core,
    +                                     qd_iterator_t *identity,
    +                                     qd_iterator_t *name);
    +static qdr_binding_t *find_binding(qdr_core_t *core,
    +                                   qd_iterator_t  *identity,
    +                                   qd_iterator_t  *name);
    +static void write_config_exchange_list(qdr_exchange_t *ex,
    +                                       qdr_query_t    *query);
    +static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
    +                                  qd_iterator_t  *name,
    +                                  qd_iterator_t  *key,
    +                                  qd_iterator_t  *next_hop,
    +                                  int             phase);
    +static void write_config_binding_map(qdr_binding_t       *binding,
    +                                     qd_composed_field_t *body);
    +static qdr_binding_t *find_binding(qdr_core_t    *core,
    +                                   qd_iterator_t *identity,
    +                                   qd_iterator_t *name);
    +static void qdr_binding_free(qdr_binding_t *b);
    +static void write_config_binding_list(qdr_binding_t *binding,
    +                                      qdr_query_t   *query);
    +static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
    +                                           int         index);
    +static next_hop_t *next_hop(qdr_exchange_t *ex,
    +                            qd_iterator_t  *address,
    +                            int             phase);
    +static void next_hop_release(next_hop_t *next_hop);
    +static next_hop_t *find_next_hop(qdr_exchange_t *ex,
    +                                 qd_iterator_t  *address,
    +                                 int             phase);
    +static bool gather_next_hops(void *handle,
    +                             const char *pattern,
    +                             void *payload);
    +static int send_message(qdr_core_t     *core,
    +                        next_hop_t     *next_hop,
    +                        qd_message_t   *msg,
    +                        qdr_delivery_t *in_delivery,
    +                        bool            exclude_inprocess,
    +                        bool            control);
    +
    +
    +//
    +// The Exchange Forwarder
    +//
    +int qdr_forward_exchange_CT(qdr_exchange_t *ex,
    +                            qd_message_t   *msg,
    +                            qdr_delivery_t *in_delivery,
    +                            bool            exclude_inprocess,
    +                            bool            control)
    +{
    +    int forwarded = 0;
    +    qdr_core_t *core = ex->core;
    +    const bool presettled = !!in_delivery ? in_delivery->settled : true;
    +
    +    ex->msgs_received += 1;
    +
    +    // @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
    +    // forwarder)
    +    //
    +    // If the delivery is not presettled, set the settled flag for 
forwarding so all
    +    // outgoing deliveries will be presettled.
    +    //
    +    // NOTE:  This is the only multicast mode currently supported.  Others 
will likely be
    +    //        implemented in the future.
    +    //
    +    if (!presettled) {
    +        in_delivery->settled = true;
    +        //
    +        // If the router is configured to reject unsettled multicasts, 
settle and reject this delivery.
    +        //
    +        if (!core->qd->allow_unsettled_multicast) {
    +            in_delivery->disposition = PN_REJECTED;
    +            in_delivery->error = qdr_error("qd:forbidden", "Deliveries to 
an exchange must be pre-settled");
    +            qdr_delivery_push_CT(core, in_delivery);
    +            return 0;
    +        }
    +    }
    +
    +    qd_iterator_t *subject = qd_message_check(msg, QD_DEPTH_PROPERTIES)
    +        ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
    +        : NULL;
    +    next_hop_list_t transmit_list;
    +    DEQ_INIT(transmit_list);
    +
    +    if (subject) {
    +        // find all matching bindings and build a list of their next hops
    +        qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops, 
&transmit_list);
    +        qd_iterator_free(subject);
    +    }
    +
    +    // if there are valid next hops then we're routing this message based 
on an
    +    // entirely new destination address.  We need to reset the origin and 
the
    +    // excluded link flags in the delivery.  We also need to reset the 
trace
    +    // annotations and ingress field in the message. This is done because 
it is
    +    // possible that the next hop is reached via the same link/router this
    +    // message arrived from.
    +    // @TODO(kgiusti) - loop detection
    +    if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
    +        if (in_delivery) {
    +            in_delivery->origin = 0;
    +            qd_bitmask_free(in_delivery->link_exclusion);
    +            in_delivery->link_exclusion = 0;
    +        }
    +
    +        const char *node_id = qd_router_id(core->qd);
    +        qd_composed_field_t *trace_field = qd_compose_subfield(0);
    +        qd_compose_start_list(trace_field);
    +        qd_compose_insert_string(trace_field, node_id);
    +        qd_compose_end_list(trace_field);
    +        qd_message_set_trace_annotation(msg, trace_field);
    +
    +        qd_composed_field_t *ingress_field = qd_compose_subfield(0);
    +        qd_compose_insert_string(ingress_field, node_id);
    +        qd_message_set_ingress_annotation(msg, ingress_field);
    +    }
    +
    +    next_hop_t *next_hop = DEQ_HEAD(transmit_list);
    +    while (next_hop) {
    +        DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
    +        next_hop->on_xmit_list = false;
    +        assert(next_hop->qdr_addr);
    +        // @TODO(kgiusti) - non-recursive handling of next hop if it is an 
exchange
    +        forwarded += send_message(ex->core, next_hop, msg, in_delivery, 
exclude_inprocess, control);
    +        next_hop = DEQ_HEAD(transmit_list);
    +    }
    +
    +    if (forwarded == 0 && ex->alternate) {
    +        forwarded = send_message(ex->core, ex->alternate, msg, 
in_delivery, exclude_inprocess, control);
    +        if (forwarded) {
    +            ex->msgs_alternate += 1;
    +        }
    +    }
    +
    +    // @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
    +    // multicast forwarder)
    +    if (forwarded == 0) {
    +        ex->msgs_dropped += 1;
    +        if (!presettled) {
    +            //
    +            // The delivery was not originally presettled and it was not
    +            // forwarded to any destinations, return it to its original
    +            // unsettled state.
    +            //
    +            in_delivery->settled = false;
    +        }
    +    } else {
    +        ex->msgs_routed += 1;
    +        if (in_delivery && !presettled) {
    +            //
    +            // The delivery was not presettled and it was forwarded to at 
least
    +            // one destination.  Accept and settle the delivery only if the
    +            // entire delivery has been received.
    +            //
    +            const bool receive_complete = 
qd_message_receive_complete(qdr_delivery_message(in_delivery));
    +            if (receive_complete) {
    +                in_delivery->disposition = PN_ACCEPTED;
    +                qdr_delivery_push_CT(core, in_delivery);
    +            }
    +        }
    +    }
    +
    +    return forwarded;
    +}
    +
    +
    +// callback from parse tree search:
    +// handle = transmit_list containing all matching next_hops
    +// pattern = pattern that matches the search (ignored)
    +// payload = list of bindings configured for the pattern
    +static bool gather_next_hops(void *handle, const char *pattern, void 
*payload)
    +{
    +    next_hop_list_t *transmit_list = (next_hop_list_t *)handle;
    +    qdr_binding_list_t *bindings = (qdr_binding_list_t *)payload;
    +
    +    qdr_binding_t *binding = DEQ_HEAD(*bindings);
    +    while (binding) {
    +        binding->msgs_matched += 1;
    +        // note - since multiple bindings may reference the next hop, it is
    +        // possible a next hop has already been added to the transmit list.
    +        // do not re-add.  This is not thread safe but that is fine since 
all
    +        // forwarding is done on the core thread.
    +        if (!binding->next_hop->on_xmit_list) {
    +            DEQ_INSERT_TAIL_N(transmit_list, *transmit_list, 
binding->next_hop);
    +            binding->next_hop->on_xmit_list = true;
    +        }
    +        binding = DEQ_NEXT_N(tree_list, binding);
    +    }
    +    return true;  // keep searching
    +}
    +
    +
    +// Forward a copy of the message to the to_addr address
    +static int send_message(qdr_core_t     *core,
    +                        next_hop_t     *next_hop,
    +                        qd_message_t   *msg,
    +                        qdr_delivery_t *in_delivery,
    +                        bool            exclude_inprocess,
    +                        bool            control)
    +{
    +    int count = 0;
    +    qd_message_t *copy = qd_message_copy(msg);
    +
    +    qd_log(core->log, QD_LOG_TRACE, "Exchange '%s' forwarding message to 
'%s'",
    +           next_hop->exchange->name_str, next_hop->next_hop_str);
    +
    +    // set "to override" and "phase" message annotations based on the next 
hop
    +    qd_composed_field_t *to_field = qd_compose_subfield(0);
    +    qd_compose_insert_string(to_field, (char *)next_hop->next_hop_str);
    +    qd_message_set_to_override_annotation(copy, to_field);  // frees 
to_field
    +    qd_message_set_phase_annotation(copy, next_hop->phase);
    +
    +    count = qdr_forward_message_CT(core, next_hop->qdr_addr, copy, 
in_delivery, exclude_inprocess, control);
    +    qd_message_free(copy);
    +
    +    return count;
    +}
    +
    +
    +long qdr_exchange_binding_count(const qdr_exchange_t *ex)
    +{
    +    return (long) DEQ_SIZE(ex->bindings);
    +}
    +
    +
    +qdr_address_t *qdr_exchange_alternate_addr(const qdr_exchange_t *ex)
    +{
    +    return (ex->alternate) ? ex->alternate->qdr_addr : NULL;
    +}
    +
    +
    +/////////////////////////////
    +// Exchange Management API //
    +/////////////////////////////
    +
    +#define QDR_CONFIG_EXCHANGE_NAME          0
    +#define QDR_CONFIG_EXCHANGE_IDENTITY      1
    +#define QDR_CONFIG_EXCHANGE_ADDRESS       2
    +#define QDR_CONFIG_EXCHANGE_PHASE         3
    +#define QDR_CONFIG_EXCHANGE_ALTERNATE     4
    +#define QDR_CONFIG_EXCHANGE_ALT_PHASE     5
    +#define QDR_CONFIG_EXCHANGE_MATCH_METHOD  6
    +#define QDR_CONFIG_EXCHANGE_BINDING_COUNT 7
    +#define QDR_CONFIG_EXCHANGE_RECEIVED      8
    +#define QDR_CONFIG_EXCHANGE_DROPPED       9
    +#define QDR_CONFIG_EXCHANGE_FORWARDED     10
    +#define QDR_CONFIG_EXCHANGE_ALTERNATED    11
    +
    +const char *qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_COLUMN_COUNT + 
1] =
    +    {"name",
    +     "identity",
    +     "address",
    +     "phase",
    +     "alternate",
    +     "alternatePhase",
    +     "matchMethod",
    +     "bindingCount",
    +     "deliveriesReceived",
    +     "deliveriesDropped",
    +     "deliveriesForwarded",
    +     "deliveriesAlternate",
    +     0};
    +
    +// from management_agent.c
    +extern const unsigned char *config_exchange_entity_type;
    +
    +#define QDR_CONFIG_BINDING_NAME         0
    +#define QDR_CONFIG_BINDING_IDENTITY     1
    +#define QDR_CONFIG_BINDING_EXCHANGE     2
    +#define QDR_CONFIG_BINDING_KEY          3
    +#define QDR_CONFIG_BINDING_NEXTHOP      4
    +#define QDR_CONFIG_BINDING_PHASE        5
    +#define QDR_CONFIG_BINDING_MATCHED      6
    +
    +const char *qdr_config_binding_columns[QDR_CONFIG_BINDING_COLUMN_COUNT + 
1] =
    +    {"name",
    +     "identity",
    +     "exchange",
    +     "key",
    +     "nextHop",
    +     "phase",
    +     "deliveriesMatched",
    +     0};
    +
    +// from management_agent.c
    +extern const unsigned char *config_binding_entity_type;
    +
    +
    +// called on core shutdown to release all exchanges
    +//
    +void qdr_exchange_free_all(qdr_core_t *core)
    +{
    +    qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
    +    while (ex) {
    +        qdr_exchange_t *next = DEQ_NEXT(ex);
    +        qdr_exchange_free(ex);
    +        ex = next;
    +    }
    +}
    +
    +
    +// Exchange CREATE
    +//
    +//
    +void qdra_config_exchange_create_CT(qdr_core_t         *core,
    +                                    qd_iterator_t      *name,
    +                                    qdr_query_t        *query,
    +                                    qd_parsed_field_t  *in_body)
    +{
    +    qdr_exchange_t *ex = NULL;
    +
    +    query->status = QD_AMQP_BAD_REQUEST;
    +
    +    if (!qd_parse_is_map(in_body)) {
    +        query->status.description = "Body of request must be a map";
    +        goto exit;
    +    }
    +
    +    if (!name) {
    +        query->status.description = "exchange requires a unique name";
    +        goto exit;
    +    }
    +
    +    qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
    +                                                             
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
    +    if (!address_field) {
    +        query->status.description = "exchange address is mandatory";
    +        goto exit;
    +    }
    +    qd_iterator_t *address = qd_parse_raw(address_field);
    +
    +    // check for duplicates
    +    {
    +        qdr_exchange_t *eptr = 0;
    +        for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr = 
DEQ_NEXT(eptr)) {
    +            if (qd_iterator_equal(address, eptr->address_str)) {
    +                query->status.description = "duplicate exchange address";
    +                goto exit;
    +            } else if (qd_iterator_equal(name, eptr->name_str)) {
    +                query->status.description = "duplicate exchange name";
    +                goto exit;
    +            }
    +        }
    +    }
    +
    +    qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
    +                                                            
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
    +    qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
    +    if (method_field) {
    +        if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned 
char *)"mqtt")) {
    +            method = QD_PARSE_TREE_MQTT;
    +        } else if (!qd_iterator_equal(qd_parse_raw(method_field), (const 
unsigned char *)"0-10")) {
    +            query->status.description = "Exchange matchMethod must be 
either '0-10' or 'mqtt'";
    +            goto exit;
    +        }
    +    }
    +
    +    int phase = 0;
    +    qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
    +                                                           
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
    +    if (phase_field) {
    +        phase = qd_parse_as_int(phase_field);
    +        if (phase < 0 || phase > 9) {
    +            query->status.description = "phase must be in the range 0-9";
    +            goto exit;
    +        }
    +    }
    +
    +    qd_iterator_t *alternate = NULL;
    +    int alt_phase = 0;
    +    qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
    +                                                               
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
    +    if (alternate_field) {
    +        alternate = qd_parse_raw(alternate_field);
    +        qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
    +                                                                   
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
    +        if (alt_phase_field) {
    +            alt_phase = qd_parse_as_int(alt_phase_field);
    +            if (alt_phase < 0 || alt_phase > 9) {
    +                query->status.description = "phase must be in the range 
0-9";
    +                goto exit;
    +            }
    +        }
    +    }
    +
    +    ex = qdr_exchange(core, name, address, phase, alternate, alt_phase, 
method);
    +    if (ex) {
    +        // @TODO(kgiusti) - for now, until the behavior is nailed down:
    +        static int warn_user;
    +        if (!warn_user) {
    +            warn_user = 1;
    +            qd_log(core->agent_log, QD_LOG_WARNING,
    +                   "The Exchange/Binding feature is currently 
EXPERIMENTAL."
    +                   " Its functionality may change in future releases"
    +                   " of the Qpid Dispatch Router. Backward compatibility 
is"
    +                   " not guaranteed.");
    +        }
    +        query->status = QD_AMQP_CREATED;
    +        if (query->body) {
    +            write_config_exchange_map(ex, query->body);
    +        }
    +    } else {
    +        query->status.description = "failed to allocate exchange";
    +    }
    +
    + exit:
    +
    +    if (query->status.status == QD_AMQP_CREATED.status) {
    +        qd_log(core->agent_log, QD_LOG_DEBUG,
    +               "Exchange %s CREATED (id=%"PRIu64")", ex->name_str, 
ex->identity);
    +
    +    } else {
    +        qd_log(core->agent_log, QD_LOG_ERROR,
    +               "Error performing CREATE of %s: %s", 
config_exchange_entity_type, query->status.description);
    +        // return a NULL body:
    +        if (query->body) qd_compose_insert_null(query->body);
    +    }
    +
    +    if (query->body) {
    +        qdr_agent_enqueue_response_CT(core, query);
    +    } else {
    +        // no body == create from internal config parser
    +        qdr_query_free(query);
    +    }
    +}
    +
    +
    +// Exchange DELETE:
    +//
    +void qdra_config_exchange_delete_CT(qdr_core_t    *core,
    +                                    qdr_query_t   *query,
    +                                    qd_iterator_t *name,
    +                                    qd_iterator_t *identity)
    +{
    +    qdr_exchange_t *ex = 0;
    +
    +    if (!name && !identity) {
    +        query->status = QD_AMQP_BAD_REQUEST;
    +        query->status.description = "No name or identity provided";
    +        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of 
%s: %s",
    +               config_exchange_entity_type, query->status.description);
    +    } else {
    +        ex = find_exchange(core, identity, name);
    +        if (ex) {
    +            qd_log(core->agent_log, QD_LOG_DEBUG,
    +                   "Exchange %s DELETED (id=%"PRIu64")", ex->name_str, 
ex->identity);
    +            qdr_exchange_free(ex);
    +            query->status = QD_AMQP_NO_CONTENT;
    +        } else
    +            query->status = QD_AMQP_NOT_FOUND;
    +    }
    +
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +// Exchange GET
    +//
    +void qdra_config_exchange_get_CT(qdr_core_t    *core,
    +                                 qd_iterator_t *name,
    +                                 qd_iterator_t *identity,
    +                                 qdr_query_t   *query,
    +                                 const char    *columns[])
    +{
    +    qdr_exchange_t *ex = 0;
    +
    +    if (!name && !identity) {
    +        query->status = QD_AMQP_BAD_REQUEST;
    +        query->status.description = "No name or identity provided";
    +        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of 
%s: %s",
    +               config_exchange_entity_type, query->status.description);
    +    }
    +    else {
    +        ex = find_exchange(core, identity, name);
    +        if (!ex) {
    +            query->status = QD_AMQP_NOT_FOUND;
    +        }
    +        else {
    +            if (query->body) write_config_exchange_map(ex, query->body);
    +            query->status = QD_AMQP_OK;
    +        }
    +    }
    +
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// Exchange GET first:
    +//
    +void qdra_config_exchange_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
    +{
    +    //
    +    // Queries that get this far will always succeed.
    +    //
    +    query->status = QD_AMQP_OK;
    +
    +    //
    +    // If the offset goes beyond the set of objects, end the query now.
    +    //
    +    if (offset >= DEQ_SIZE(core->exchanges)) {
    +        query->more = false;
    +        qdr_agent_enqueue_response_CT(core, query);
    +        return;
    +    }
    +
    +    //
    +    // Run to the object at the offset.
    +    //
    +    qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
    +    for (int i = 0; i < offset && ex; i++)
    +        ex = DEQ_NEXT(ex);
    +    assert(ex);
    +
    +    //
    +    // Write the columns of the object into the response body.
    +    //
    +    if (query->body) write_config_exchange_list(ex, query);
    +
    +    //
    +    // Advance to the next address
    +    //
    +    query->next_offset = offset + 1;
    +    query->more = !!DEQ_NEXT(ex);
    +
    +    //
    +    // Enqueue the response.
    +    //
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +// Exchange GET-NEXT
    +//
    +void qdra_config_exchange_get_next_CT(qdr_core_t *core, qdr_query_t *query)
    +{
    +    qdr_exchange_t *ex = 0;
    +
    +    if (query->next_offset < DEQ_SIZE(core->exchanges)) {
    +        ex = DEQ_HEAD(core->exchanges);
    +        for (int i = 0; i < query->next_offset && ex; i++)
    +            ex = DEQ_NEXT(ex);
    +    }
    +
    +    if (ex) {
    +        //
    +        // Write the columns of the addr entity into the response body.
    +        //
    +        if (query->body) write_config_exchange_list(ex, query);
    +
    +        //
    +        // Advance to the next object
    +        //
    +        query->next_offset++;
    +        query->more = !!DEQ_NEXT(ex);
    +    } else
    +        query->more = false;
    +
    +    //
    +    // Enqueue the response.
    +    //
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// Binding CREATE
    +void qdra_config_binding_create_CT(qdr_core_t         *core,
    +                                   qd_iterator_t      *name,
    +                                   qdr_query_t        *query,
    +                                   qd_parsed_field_t  *in_body)
    +{
    +    qdr_binding_t *binding = NULL;
    +    qdr_exchange_t *ex = NULL;
    +    qd_iterator_t *key = NULL;
    +
    +    query->status = QD_AMQP_BAD_REQUEST;
    +
    +    if (!qd_parse_is_map(in_body)) {
    +        query->status.description = "Body of request must be a map";
    +        goto exit;
    +    }
    +
    +    qd_parsed_field_t *exchange_field = qd_parse_value_by_key(in_body,
    +                                                              
qdr_config_binding_columns[QDR_CONFIG_BINDING_EXCHANGE]);
    +    if (!exchange_field) {
    +        query->status.description = "Binding configuration requires an 
exchange";
    +        goto exit;
    +    }
    +
    +    // lookup the exchange by its name:
    +    ex = find_exchange(core, NULL, qd_parse_raw(exchange_field));
    +    if (!ex) {
    +        query->status.description = "Named exchange does not exist";
    +        goto exit;
    +    }
    +
    +    qd_parsed_field_t *next_hop_field = qd_parse_value_by_key(in_body,
    +                                                              
qdr_config_binding_columns[QDR_CONFIG_BINDING_NEXTHOP]);
    +    if (!next_hop_field) {
    +        query->status.description = "No next hop specified";
    +        goto exit;
    +    }
    +    qd_iterator_t *nhop = qd_parse_raw(next_hop_field);
    +
    +    qd_parsed_field_t *key_field = qd_parse_value_by_key(in_body,
    +                                                         
qdr_config_binding_columns[QDR_CONFIG_BINDING_KEY]);
    +    // if no pattern given, assume match all "#":
    +    key = key_field ? qd_iterator_dup(qd_parse_raw(key_field)) : 
qd_iterator_string("#", ITER_VIEW_ALL);
    +
    +    if (!qd_parse_tree_validate_pattern(ex->parse_tree, key)) {
    +        query->status.description = "The binding key pattern is invalid";
    +        goto exit;
    +    }
    +
    +    qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
    +                                                         
qdr_config_binding_columns[QDR_CONFIG_BINDING_PHASE]);
    +    int phase = (phase_field ? qd_parse_as_int(phase_field) : 0);
    +    if (phase < 0 || phase > 9) {
    +        query->status.description = "phase must be in the range 0-9";
    +        goto exit;
    +    }
    +
    +    // check for duplicates: the name and the tuple (key, next hop, phase) 
must
    +    // be unique per exchange
    +
    +    for (qdr_binding_t *b = DEQ_HEAD(ex->bindings); b; b = 
DEQ_NEXT_N(exchange_list, b)) {
    +        if (name && b->name_str && qd_iterator_equal(name, b->name_str)) {
    +            query->status.description = "Duplicate next hop name";
    +            goto exit;
    +        } else if (qd_iterator_equal(key, b->key_str) &&
    +                   qd_iterator_equal(nhop, b->next_hop->next_hop_str) &&
    +                   phase == b->next_hop->phase) {
    +            query->status.description = "Next hop for key already exists";
    +            goto exit;
    +        }
    +    }
    +
    +    binding = qdr_binding(ex, name, key, nhop, phase);
    +    if (binding) {
    +        query->status = QD_AMQP_CREATED;
    +        if (query->body) {
    +            write_config_binding_map(binding, query->body);
    +        }
    +    } else {
    +        query->status.description = "Failed to allocate next hop";
    +    }
    +
    +
    + exit:
    +
    +    if (query->status.status == QD_AMQP_CREATED.status) {
    +        qd_log(core->agent_log, QD_LOG_DEBUG,
    +               "Exchange %s Binding %s -> %s CREATED (id=%"PRIu64")", 
ex->name_str,
    +               binding->key_str, binding->next_hop->next_hop_str, 
binding->identity);
    +    } else {
    +        qd_log(core->agent_log, QD_LOG_ERROR,
    +               "Error performing CREATE of %s: %s",
    +               config_binding_entity_type,
    +               query->status.description);
    +        // return a NULL body:
    +        if (query->body) qd_compose_insert_null(query->body);
    +    }
    +
    +    if (query->body) {
    +        qdr_agent_enqueue_response_CT(core, query);
    +    } else {
    +        // no body == create from internal config parser
    +        qdr_query_free(query);
    +    }
    +
    +    if (key) qd_iterator_free(key);
    +}
    +
    +
    +// Binding DELETE
    +//
    +void qdra_config_binding_delete_CT(qdr_core_t    *core,
    +                                   qdr_query_t   *query,
    +                                   qd_iterator_t *name,
    +                                   qd_iterator_t *identity)
    +{
    +    if (!identity && !name) {
    +        query->status = QD_AMQP_BAD_REQUEST;
    +        query->status.description = "No binding name or identity provided";
    +        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of 
%s: %s",
    +               config_binding_entity_type, query->status.description);
    +    } else {
    +        qdr_binding_t *binding = find_binding(core, identity, name);
    +        if (!binding) {
    +            query->status = QD_AMQP_NOT_FOUND;
    +        } else {
    +            qd_log(core->agent_log, QD_LOG_DEBUG,
    +                   "Binding %s -> %s on exchange %s DELETED 
(id=%"PRIu64")",
    +                   binding->key_str,
    +                   binding->next_hop->next_hop_str,
    +                   binding->exchange->name_str,
    +                   binding->identity);
    +            qdr_binding_free(binding);
    +            query->status = QD_AMQP_NO_CONTENT;
    +        }
    +    }
    +
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// Binding GET
    +//
    +void qdra_config_binding_get_CT(qdr_core_t    *core,
    +                                qd_iterator_t *name,
    +                                qd_iterator_t *identity,
    +                                qdr_query_t   *query,
    +                                const char    *columns[])
    +{
    +    if (!identity && !name) {
    +        query->status = QD_AMQP_BAD_REQUEST;
    +        query->status.description = "No binding name or identity provided";
    +        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of 
%s: %s",
    +               config_binding_entity_type, query->status.description);
    +    } else {
    +        qdr_binding_t *binding = find_binding(core, identity, name);
    +        if (binding == 0) {
    +            query->status = QD_AMQP_NOT_FOUND;
    +        }
    +        else {
    +            if (query->body) write_config_binding_map(binding, 
query->body);
    +            query->status = QD_AMQP_OK;
    +        }
    +    }
    +
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// Binding GET first
    +//
    +void qdra_config_binding_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
    +{
    +    query->status = QD_AMQP_OK;
    +
    +    qdr_binding_t *binding = get_binding_at_index(core, offset);
    +    if (!binding) {
    +        query->more = false;
    +        qdr_agent_enqueue_response_CT(core, query);
    +        return;
    +    }
    +
    +    if (query->body) write_config_binding_list(binding, query);
    +    query->next_offset = offset + 1;
    +    query->more = !!(DEQ_NEXT_N(exchange_list, binding) || 
DEQ_NEXT(binding->exchange));
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// Binding GET-NEXT
    +//
    +void qdra_config_binding_get_next_CT(qdr_core_t *core, qdr_query_t *query)
    +{
    +    qdr_binding_t *binding = get_binding_at_index(core, 
query->next_offset);
    +    if (binding) {
    +        if (query->body) write_config_binding_list(binding, query);
    +        query->next_offset++;
    +        query->more = !!(DEQ_NEXT_N(exchange_list, binding) || 
DEQ_NEXT(binding->exchange));
    +    } else
    +        query->more = false;
    +    qdr_agent_enqueue_response_CT(core, query);
    +}
    +
    +
    +// create a new iterator with its own copy of the data. caller must free 
both the
    +// iterator and the returned data buffer.  Used for retaining data 
extracted
    +// from query messages.
    +static qd_iterator_t *clone_iterator(qd_iterator_t *iter, unsigned char 
**data)
    --- End diff --
    
    Can this function be included as a part of the iterator API instead of 
being a static function ? Like qd_clone_iterator() ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to