Github user ganeshmurthy commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/244#discussion_r161883427
--- Diff: src/router_core/exchange_bindings.c ---
@@ -0,0 +1,1336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <qpid/dispatch/ctools.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include "router_core_private.h"
+#include "exchange_bindings.h"
+
+
+// next_hop_t
+// Describes the destination of a forwarded message
+// May be shared by different bindings
+//
+typedef struct next_hop_t next_hop_t;
+struct next_hop_t
+{
+ // per-exchange list of all next hops
+ DEQ_LINKS_N(exchange_list, next_hop_t);
+ // when hooked to the transmit list
+ DEQ_LINKS_N(transmit_list, next_hop_t);
+
+ int ref_count; // binding references
+ int phase;
+ bool on_xmit_list;
+ qdr_exchange_t *exchange;
+ qd_iterator_t *next_hop;
+ unsigned char *next_hop_str;
+ qdr_address_t *qdr_addr;
+};
+
+ALLOC_DECLARE(next_hop_t);
+ALLOC_DEFINE(next_hop_t);
+DEQ_DECLARE(next_hop_t, next_hop_list_t);
+
+// qdr_binding_t
+// Represents a subject key --> next hop mapping
+// A binding is uniquely identified by the tuple (pattern, nextHop,
phase). No
+// two bindings with the same tuple value can exist on an exchange.
+// The binding is implemented using two classes: qdr_binding_t and
+// next_hop_t. The qdr_binding_t holds the pattern and points to the
+// next_hop_t. This allows different patterns to share the same nextHop.
+// Since there is only one next_hop_t instance for each (nextHop, phase)
value,
+// we guarantee only 1 copy of a message is forwarded to a given
nextHop+phase
+// even if multiple distinct patterns are matched. Ex: a message with a
+// value of "a.b" will match two distict binding keys "+.b" and "a.+". If
+// both these patterns share the same next_hop_t only 1 copy of the message
+// will be forwarded.
+typedef struct qdr_binding qdr_binding_t;
+struct qdr_binding
+{
+ // per-exchange list of all bindings
+ DEQ_LINKS_N(exchange_list, qdr_binding_t);
+ // parse tree node's list of bindings sharing the same pattern
+ DEQ_LINKS_N(tree_list, qdr_binding_t);
+
+ qd_iterator_t *name;
+ unsigned char *name_str;
+ uint64_t identity;
+ qdr_exchange_t *exchange;
+
+ // the routing key
+ qd_iterator_t *key;
+ unsigned char *key_str;
+ next_hop_t *next_hop;
+
+ uint64_t msgs_matched;
+};
+
+ALLOC_DECLARE(qdr_binding_t);
+ALLOC_DEFINE(qdr_binding_t);
+DEQ_DECLARE(qdr_binding_t, qdr_binding_list_t);
+
+
+struct qdr_exchange {
+ DEQ_LINKS(qdr_exchange_t); // for core->exchanges
+ qdr_core_t *core;
+ uint64_t identity;
+ qd_iterator_t *name;
+ unsigned char *name_str;
+ qd_iterator_t *address;
+ unsigned char *address_str;
+ int phase;
+ qd_parse_tree_t *parse_tree;
+ qdr_address_t *qdr_addr;
+ next_hop_t *alternate;
+ qdr_binding_list_t bindings;
+ next_hop_list_t next_hops;
+
+ uint64_t msgs_received;
+ uint64_t msgs_dropped;
+ uint64_t msgs_routed;
+ uint64_t msgs_alternate;
+};
+
+ALLOC_DECLARE(qdr_exchange_t);
+ALLOC_DEFINE(qdr_exchange_t);
+
+static void qdr_exchange_free(qdr_exchange_t *ex);
+static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
+ qd_iterator_t *name,
+ qd_iterator_t *address,
+ int phase,
+ qd_iterator_t *alternate,
+ int alt_phase,
+ qd_parse_tree_type_t method);
+static void write_config_exchange_map(qdr_exchange_t *ex,
+ qd_composed_field_t *body);
+static qdr_exchange_t *find_exchange(qdr_core_t *core,
+ qd_iterator_t *identity,
+ qd_iterator_t *name);
+static qdr_binding_t *find_binding(qdr_core_t *core,
+ qd_iterator_t *identity,
+ qd_iterator_t *name);
+static void write_config_exchange_list(qdr_exchange_t *ex,
+ qdr_query_t *query);
+static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
+ qd_iterator_t *name,
+ qd_iterator_t *key,
+ qd_iterator_t *next_hop,
+ int phase);
+static void write_config_binding_map(qdr_binding_t *binding,
+ qd_composed_field_t *body);
+static qdr_binding_t *find_binding(qdr_core_t *core,
+ qd_iterator_t *identity,
+ qd_iterator_t *name);
+static void qdr_binding_free(qdr_binding_t *b);
+static void write_config_binding_list(qdr_binding_t *binding,
+ qdr_query_t *query);
+static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
+ int index);
+static next_hop_t *next_hop(qdr_exchange_t *ex,
+ qd_iterator_t *address,
+ int phase);
+static void next_hop_release(next_hop_t *next_hop);
+static next_hop_t *find_next_hop(qdr_exchange_t *ex,
+ qd_iterator_t *address,
+ int phase);
+static bool gather_next_hops(void *handle,
+ const char *pattern,
+ void *payload);
+static int send_message(qdr_core_t *core,
+ next_hop_t *next_hop,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control);
+
+
+//
+// The Exchange Forwarder
+//
+int qdr_forward_exchange_CT(qdr_exchange_t *ex,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control)
+{
+ int forwarded = 0;
+ qdr_core_t *core = ex->core;
+ const bool presettled = !!in_delivery ? in_delivery->settled : true;
+
+ ex->msgs_received += 1;
+
+ // @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
+ // forwarder)
+ //
+ // If the delivery is not presettled, set the settled flag for
forwarding so all
+ // outgoing deliveries will be presettled.
+ //
+ // NOTE: This is the only multicast mode currently supported. Others
will likely be
+ // implemented in the future.
+ //
+ if (!presettled) {
+ in_delivery->settled = true;
+ //
+ // If the router is configured to reject unsettled multicasts,
settle and reject this delivery.
+ //
+ if (!core->qd->allow_unsettled_multicast) {
+ in_delivery->disposition = PN_REJECTED;
+ in_delivery->error = qdr_error("qd:forbidden", "Deliveries to
an exchange must be pre-settled");
+ qdr_delivery_push_CT(core, in_delivery);
+ return 0;
+ }
+ }
+
+ qd_iterator_t *subject = qd_message_check(msg, QD_DEPTH_PROPERTIES)
+ ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
+ : NULL;
+ next_hop_list_t transmit_list;
+ DEQ_INIT(transmit_list);
+
+ if (subject) {
+ // find all matching bindings and build a list of their next hops
+ qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops,
&transmit_list);
+ qd_iterator_free(subject);
+ }
+
+ // if there are valid next hops then we're routing this message based
on an
+ // entirely new destination address. We need to reset the origin and
the
+ // excluded link flags in the delivery. We also need to reset the
trace
+ // annotations and ingress field in the message. This is done because
it is
+ // possible that the next hop is reached via the same link/router this
+ // message arrived from.
+ // @TODO(kgiusti) - loop detection
+ if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
+ if (in_delivery) {
+ in_delivery->origin = 0;
+ qd_bitmask_free(in_delivery->link_exclusion);
+ in_delivery->link_exclusion = 0;
+ }
+
+ const char *node_id = qd_router_id(core->qd);
+ qd_composed_field_t *trace_field = qd_compose_subfield(0);
+ qd_compose_start_list(trace_field);
+ qd_compose_insert_string(trace_field, node_id);
+ qd_compose_end_list(trace_field);
+ qd_message_set_trace_annotation(msg, trace_field);
+
+ qd_composed_field_t *ingress_field = qd_compose_subfield(0);
+ qd_compose_insert_string(ingress_field, node_id);
+ qd_message_set_ingress_annotation(msg, ingress_field);
+ }
+
+ next_hop_t *next_hop = DEQ_HEAD(transmit_list);
+ while (next_hop) {
+ DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
+ next_hop->on_xmit_list = false;
+ assert(next_hop->qdr_addr);
+ // @TODO(kgiusti) - non-recursive handling of next hop if it is an
exchange
+ forwarded += send_message(ex->core, next_hop, msg, in_delivery,
exclude_inprocess, control);
+ next_hop = DEQ_HEAD(transmit_list);
+ }
+
+ if (forwarded == 0 && ex->alternate) {
+ forwarded = send_message(ex->core, ex->alternate, msg,
in_delivery, exclude_inprocess, control);
+ if (forwarded) {
+ ex->msgs_alternate += 1;
+ }
+ }
+
+ // @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
+ // multicast forwarder)
+ if (forwarded == 0) {
+ ex->msgs_dropped += 1;
+ if (!presettled) {
+ //
+ // The delivery was not originally presettled and it was not
+ // forwarded to any destinations, return it to its original
+ // unsettled state.
+ //
+ in_delivery->settled = false;
+ }
+ } else {
+ ex->msgs_routed += 1;
+ if (in_delivery && !presettled) {
+ //
+ // The delivery was not presettled and it was forwarded to at
least
+ // one destination. Accept and settle the delivery only if the
+ // entire delivery has been received.
+ //
+ const bool receive_complete =
qd_message_receive_complete(qdr_delivery_message(in_delivery));
+ if (receive_complete) {
+ in_delivery->disposition = PN_ACCEPTED;
+ qdr_delivery_push_CT(core, in_delivery);
+ }
+ }
+ }
+
+ return forwarded;
+}
+
+
+// callback from parse tree search:
+// handle = transmit_list containing all matching next_hops
+// pattern = pattern that matches the search (ignored)
+// payload = list of bindings configured for the pattern
+static bool gather_next_hops(void *handle, const char *pattern, void
*payload)
+{
+ next_hop_list_t *transmit_list = (next_hop_list_t *)handle;
+ qdr_binding_list_t *bindings = (qdr_binding_list_t *)payload;
+
+ qdr_binding_t *binding = DEQ_HEAD(*bindings);
+ while (binding) {
+ binding->msgs_matched += 1;
+ // note - since multiple bindings may reference the next hop, it is
+ // possible a next hop has already been added to the transmit list.
+ // do not re-add. This is not thread safe but that is fine since
all
+ // forwarding is done on the core thread.
+ if (!binding->next_hop->on_xmit_list) {
+ DEQ_INSERT_TAIL_N(transmit_list, *transmit_list,
binding->next_hop);
+ binding->next_hop->on_xmit_list = true;
+ }
+ binding = DEQ_NEXT_N(tree_list, binding);
+ }
+ return true; // keep searching
+}
+
+
+// Forward a copy of the message to the to_addr address
+static int send_message(qdr_core_t *core,
+ next_hop_t *next_hop,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control)
+{
+ int count = 0;
+ qd_message_t *copy = qd_message_copy(msg);
+
+ qd_log(core->log, QD_LOG_TRACE, "Exchange '%s' forwarding message to
'%s'",
+ next_hop->exchange->name_str, next_hop->next_hop_str);
+
+ // set "to override" and "phase" message annotations based on the next
hop
+ qd_composed_field_t *to_field = qd_compose_subfield(0);
+ qd_compose_insert_string(to_field, (char *)next_hop->next_hop_str);
+ qd_message_set_to_override_annotation(copy, to_field); // frees
to_field
+ qd_message_set_phase_annotation(copy, next_hop->phase);
+
+ count = qdr_forward_message_CT(core, next_hop->qdr_addr, copy,
in_delivery, exclude_inprocess, control);
+ qd_message_free(copy);
+
+ return count;
+}
+
+
+long qdr_exchange_binding_count(const qdr_exchange_t *ex)
+{
+ return (long) DEQ_SIZE(ex->bindings);
+}
+
+
+qdr_address_t *qdr_exchange_alternate_addr(const qdr_exchange_t *ex)
+{
+ return (ex->alternate) ? ex->alternate->qdr_addr : NULL;
+}
+
+
+/////////////////////////////
+// Exchange Management API //
+/////////////////////////////
+
+#define QDR_CONFIG_EXCHANGE_NAME 0
+#define QDR_CONFIG_EXCHANGE_IDENTITY 1
+#define QDR_CONFIG_EXCHANGE_ADDRESS 2
+#define QDR_CONFIG_EXCHANGE_PHASE 3
+#define QDR_CONFIG_EXCHANGE_ALTERNATE 4
+#define QDR_CONFIG_EXCHANGE_ALT_PHASE 5
+#define QDR_CONFIG_EXCHANGE_MATCH_METHOD 6
+#define QDR_CONFIG_EXCHANGE_BINDING_COUNT 7
+#define QDR_CONFIG_EXCHANGE_RECEIVED 8
+#define QDR_CONFIG_EXCHANGE_DROPPED 9
+#define QDR_CONFIG_EXCHANGE_FORWARDED 10
+#define QDR_CONFIG_EXCHANGE_ALTERNATED 11
+
+const char *qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_COLUMN_COUNT +
1] =
+ {"name",
+ "identity",
+ "address",
+ "phase",
+ "alternate",
+ "alternatePhase",
+ "matchMethod",
+ "bindingCount",
+ "deliveriesReceived",
+ "deliveriesDropped",
+ "deliveriesForwarded",
+ "deliveriesAlternate",
+ 0};
+
+// from management_agent.c
+extern const unsigned char *config_exchange_entity_type;
+
+#define QDR_CONFIG_BINDING_NAME 0
+#define QDR_CONFIG_BINDING_IDENTITY 1
+#define QDR_CONFIG_BINDING_EXCHANGE 2
+#define QDR_CONFIG_BINDING_KEY 3
+#define QDR_CONFIG_BINDING_NEXTHOP 4
+#define QDR_CONFIG_BINDING_PHASE 5
+#define QDR_CONFIG_BINDING_MATCHED 6
+
+const char *qdr_config_binding_columns[QDR_CONFIG_BINDING_COLUMN_COUNT +
1] =
+ {"name",
+ "identity",
+ "exchange",
+ "key",
+ "nextHop",
+ "phase",
+ "deliveriesMatched",
+ 0};
+
+// from management_agent.c
+extern const unsigned char *config_binding_entity_type;
+
+
+// called on core shutdown to release all exchanges
+//
+void qdr_exchange_free_all(qdr_core_t *core)
+{
+ qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
+ while (ex) {
+ qdr_exchange_t *next = DEQ_NEXT(ex);
+ qdr_exchange_free(ex);
+ ex = next;
+ }
+}
+
+
+// Exchange CREATE
+//
+//
+void qdra_config_exchange_create_CT(qdr_core_t *core,
+ qd_iterator_t *name,
+ qdr_query_t *query,
+ qd_parsed_field_t *in_body)
+{
+ qdr_exchange_t *ex = NULL;
+
+ query->status = QD_AMQP_BAD_REQUEST;
+
+ if (!qd_parse_is_map(in_body)) {
+ query->status.description = "Body of request must be a map";
+ goto exit;
+ }
+
+ if (!name) {
+ query->status.description = "exchange requires a unique name";
+ goto exit;
+ }
+
+ qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
+
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
+ if (!address_field) {
+ query->status.description = "exchange address is mandatory";
+ goto exit;
+ }
+ qd_iterator_t *address = qd_parse_raw(address_field);
+
+ // check for duplicates
+ {
+ qdr_exchange_t *eptr = 0;
+ for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr =
DEQ_NEXT(eptr)) {
+ if (qd_iterator_equal(address, eptr->address_str)) {
+ query->status.description = "duplicate exchange address";
+ goto exit;
+ } else if (qd_iterator_equal(name, eptr->name_str)) {
+ query->status.description = "duplicate exchange name";
+ goto exit;
+ }
+ }
+ }
+
+ qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
+
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
+ qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
+ if (method_field) {
+ if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned
char *)"mqtt")) {
+ method = QD_PARSE_TREE_MQTT;
+ } else if (!qd_iterator_equal(qd_parse_raw(method_field), (const
unsigned char *)"0-10")) {
+ query->status.description = "Exchange matchMethod must be
either '0-10' or 'mqtt'";
+ goto exit;
+ }
+ }
+
+ int phase = 0;
+ qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
+
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
+ if (phase_field) {
+ phase = qd_parse_as_int(phase_field);
+ if (phase < 0 || phase > 9) {
+ query->status.description = "phase must be in the range 0-9";
+ goto exit;
+ }
+ }
+
+ qd_iterator_t *alternate = NULL;
+ int alt_phase = 0;
+ qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
+
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
+ if (alternate_field) {
+ alternate = qd_parse_raw(alternate_field);
+ qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
+
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
+ if (alt_phase_field) {
+ alt_phase = qd_parse_as_int(alt_phase_field);
+ if (alt_phase < 0 || alt_phase > 9) {
+ query->status.description = "phase must be in the range
0-9";
+ goto exit;
+ }
+ }
+ }
+
+ ex = qdr_exchange(core, name, address, phase, alternate, alt_phase,
method);
+ if (ex) {
+ // @TODO(kgiusti) - for now, until the behavior is nailed down:
+ static int warn_user;
+ if (!warn_user) {
+ warn_user = 1;
+ qd_log(core->agent_log, QD_LOG_WARNING,
+ "The Exchange/Binding feature is currently
EXPERIMENTAL."
+ " Its functionality may change in future releases"
+ " of the Qpid Dispatch Router. Backward compatibility
is"
+ " not guaranteed.");
+ }
+ query->status = QD_AMQP_CREATED;
+ if (query->body) {
+ write_config_exchange_map(ex, query->body);
+ }
+ } else {
+ query->status.description = "failed to allocate exchange";
+ }
+
+ exit:
+
+ if (query->status.status == QD_AMQP_CREATED.status) {
+ qd_log(core->agent_log, QD_LOG_DEBUG,
+ "Exchange %s CREATED (id=%"PRIu64")", ex->name_str,
ex->identity);
+
+ } else {
+ qd_log(core->agent_log, QD_LOG_ERROR,
+ "Error performing CREATE of %s: %s",
config_exchange_entity_type, query->status.description);
+ // return a NULL body:
+ if (query->body) qd_compose_insert_null(query->body);
+ }
+
+ if (query->body) {
+ qdr_agent_enqueue_response_CT(core, query);
+ } else {
+ // no body == create from internal config parser
+ qdr_query_free(query);
+ }
+}
+
+
+// Exchange DELETE:
+//
+void qdra_config_exchange_delete_CT(qdr_core_t *core,
+ qdr_query_t *query,
+ qd_iterator_t *name,
+ qd_iterator_t *identity)
+{
+ qdr_exchange_t *ex = 0;
+
+ if (!name && !identity) {
+ query->status = QD_AMQP_BAD_REQUEST;
+ query->status.description = "No name or identity provided";
+ qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of
%s: %s",
+ config_exchange_entity_type, query->status.description);
+ } else {
+ ex = find_exchange(core, identity, name);
+ if (ex) {
+ qd_log(core->agent_log, QD_LOG_DEBUG,
+ "Exchange %s DELETED (id=%"PRIu64")", ex->name_str,
ex->identity);
+ qdr_exchange_free(ex);
+ query->status = QD_AMQP_NO_CONTENT;
+ } else
+ query->status = QD_AMQP_NOT_FOUND;
+ }
+
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+// Exchange GET
+//
+void qdra_config_exchange_get_CT(qdr_core_t *core,
+ qd_iterator_t *name,
+ qd_iterator_t *identity,
+ qdr_query_t *query,
+ const char *columns[])
+{
+ qdr_exchange_t *ex = 0;
+
+ if (!name && !identity) {
+ query->status = QD_AMQP_BAD_REQUEST;
+ query->status.description = "No name or identity provided";
+ qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of
%s: %s",
+ config_exchange_entity_type, query->status.description);
+ }
+ else {
+ ex = find_exchange(core, identity, name);
+ if (!ex) {
+ query->status = QD_AMQP_NOT_FOUND;
+ }
+ else {
+ if (query->body) write_config_exchange_map(ex, query->body);
+ query->status = QD_AMQP_OK;
+ }
+ }
+
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Exchange GET first:
+//
+void qdra_config_exchange_get_first_CT(qdr_core_t *core, qdr_query_t
*query, int offset)
+{
+ //
+ // Queries that get this far will always succeed.
+ //
+ query->status = QD_AMQP_OK;
+
+ //
+ // If the offset goes beyond the set of objects, end the query now.
+ //
+ if (offset >= DEQ_SIZE(core->exchanges)) {
+ query->more = false;
+ qdr_agent_enqueue_response_CT(core, query);
+ return;
+ }
+
+ //
+ // Run to the object at the offset.
+ //
+ qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
+ for (int i = 0; i < offset && ex; i++)
+ ex = DEQ_NEXT(ex);
+ assert(ex);
+
+ //
+ // Write the columns of the object into the response body.
+ //
+ if (query->body) write_config_exchange_list(ex, query);
+
+ //
+ // Advance to the next address
+ //
+ query->next_offset = offset + 1;
+ query->more = !!DEQ_NEXT(ex);
+
+ //
+ // Enqueue the response.
+ //
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+// Exchange GET-NEXT
+//
+void qdra_config_exchange_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+ qdr_exchange_t *ex = 0;
+
+ if (query->next_offset < DEQ_SIZE(core->exchanges)) {
+ ex = DEQ_HEAD(core->exchanges);
+ for (int i = 0; i < query->next_offset && ex; i++)
+ ex = DEQ_NEXT(ex);
+ }
+
+ if (ex) {
+ //
+ // Write the columns of the addr entity into the response body.
+ //
+ if (query->body) write_config_exchange_list(ex, query);
+
+ //
+ // Advance to the next object
+ //
+ query->next_offset++;
+ query->more = !!DEQ_NEXT(ex);
+ } else
+ query->more = false;
+
+ //
+ // Enqueue the response.
+ //
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding CREATE
+void qdra_config_binding_create_CT(qdr_core_t *core,
+ qd_iterator_t *name,
+ qdr_query_t *query,
+ qd_parsed_field_t *in_body)
+{
+ qdr_binding_t *binding = NULL;
+ qdr_exchange_t *ex = NULL;
+ qd_iterator_t *key = NULL;
+
+ query->status = QD_AMQP_BAD_REQUEST;
+
+ if (!qd_parse_is_map(in_body)) {
+ query->status.description = "Body of request must be a map";
+ goto exit;
+ }
+
+ qd_parsed_field_t *exchange_field = qd_parse_value_by_key(in_body,
+
qdr_config_binding_columns[QDR_CONFIG_BINDING_EXCHANGE]);
+ if (!exchange_field) {
+ query->status.description = "Binding configuration requires an
exchange";
+ goto exit;
+ }
+
+ // lookup the exchange by its name:
+ ex = find_exchange(core, NULL, qd_parse_raw(exchange_field));
+ if (!ex) {
+ query->status.description = "Named exchange does not exist";
+ goto exit;
+ }
+
+ qd_parsed_field_t *next_hop_field = qd_parse_value_by_key(in_body,
+
qdr_config_binding_columns[QDR_CONFIG_BINDING_NEXTHOP]);
+ if (!next_hop_field) {
+ query->status.description = "No next hop specified";
+ goto exit;
+ }
+ qd_iterator_t *nhop = qd_parse_raw(next_hop_field);
+
+ qd_parsed_field_t *key_field = qd_parse_value_by_key(in_body,
+
qdr_config_binding_columns[QDR_CONFIG_BINDING_KEY]);
+ // if no pattern given, assume match all "#":
+ key = key_field ? qd_iterator_dup(qd_parse_raw(key_field)) :
qd_iterator_string("#", ITER_VIEW_ALL);
+
+ if (!qd_parse_tree_validate_pattern(ex->parse_tree, key)) {
+ query->status.description = "The binding key pattern is invalid";
+ goto exit;
+ }
+
+ qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
+
qdr_config_binding_columns[QDR_CONFIG_BINDING_PHASE]);
+ int phase = (phase_field ? qd_parse_as_int(phase_field) : 0);
+ if (phase < 0 || phase > 9) {
+ query->status.description = "phase must be in the range 0-9";
+ goto exit;
+ }
+
+ // check for duplicates: the name and the tuple (key, next hop, phase)
must
+ // be unique per exchange
+
+ for (qdr_binding_t *b = DEQ_HEAD(ex->bindings); b; b =
DEQ_NEXT_N(exchange_list, b)) {
+ if (name && b->name_str && qd_iterator_equal(name, b->name_str)) {
+ query->status.description = "Duplicate next hop name";
+ goto exit;
+ } else if (qd_iterator_equal(key, b->key_str) &&
+ qd_iterator_equal(nhop, b->next_hop->next_hop_str) &&
+ phase == b->next_hop->phase) {
+ query->status.description = "Next hop for key already exists";
+ goto exit;
+ }
+ }
+
+ binding = qdr_binding(ex, name, key, nhop, phase);
+ if (binding) {
+ query->status = QD_AMQP_CREATED;
+ if (query->body) {
+ write_config_binding_map(binding, query->body);
+ }
+ } else {
+ query->status.description = "Failed to allocate next hop";
+ }
+
+
+ exit:
+
+ if (query->status.status == QD_AMQP_CREATED.status) {
+ qd_log(core->agent_log, QD_LOG_DEBUG,
+ "Exchange %s Binding %s -> %s CREATED (id=%"PRIu64")",
ex->name_str,
+ binding->key_str, binding->next_hop->next_hop_str,
binding->identity);
+ } else {
+ qd_log(core->agent_log, QD_LOG_ERROR,
+ "Error performing CREATE of %s: %s",
+ config_binding_entity_type,
+ query->status.description);
+ // return a NULL body:
+ if (query->body) qd_compose_insert_null(query->body);
+ }
+
+ if (query->body) {
+ qdr_agent_enqueue_response_CT(core, query);
+ } else {
+ // no body == create from internal config parser
+ qdr_query_free(query);
+ }
+
+ if (key) qd_iterator_free(key);
+}
+
+
+// Binding DELETE
+//
+void qdra_config_binding_delete_CT(qdr_core_t *core,
+ qdr_query_t *query,
+ qd_iterator_t *name,
+ qd_iterator_t *identity)
+{
+ if (!identity && !name) {
+ query->status = QD_AMQP_BAD_REQUEST;
+ query->status.description = "No binding name or identity provided";
+ qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of
%s: %s",
+ config_binding_entity_type, query->status.description);
+ } else {
+ qdr_binding_t *binding = find_binding(core, identity, name);
+ if (!binding) {
+ query->status = QD_AMQP_NOT_FOUND;
+ } else {
+ qd_log(core->agent_log, QD_LOG_DEBUG,
+ "Binding %s -> %s on exchange %s DELETED
(id=%"PRIu64")",
+ binding->key_str,
+ binding->next_hop->next_hop_str,
+ binding->exchange->name_str,
+ binding->identity);
+ qdr_binding_free(binding);
+ query->status = QD_AMQP_NO_CONTENT;
+ }
+ }
+
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET
+//
+void qdra_config_binding_get_CT(qdr_core_t *core,
+ qd_iterator_t *name,
+ qd_iterator_t *identity,
+ qdr_query_t *query,
+ const char *columns[])
+{
+ if (!identity && !name) {
+ query->status = QD_AMQP_BAD_REQUEST;
+ query->status.description = "No binding name or identity provided";
+ qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of
%s: %s",
+ config_binding_entity_type, query->status.description);
+ } else {
+ qdr_binding_t *binding = find_binding(core, identity, name);
+ if (binding == 0) {
+ query->status = QD_AMQP_NOT_FOUND;
+ }
+ else {
+ if (query->body) write_config_binding_map(binding,
query->body);
+ query->status = QD_AMQP_OK;
+ }
+ }
+
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET first
+//
+void qdra_config_binding_get_first_CT(qdr_core_t *core, qdr_query_t
*query, int offset)
+{
+ query->status = QD_AMQP_OK;
+
+ qdr_binding_t *binding = get_binding_at_index(core, offset);
+ if (!binding) {
+ query->more = false;
+ qdr_agent_enqueue_response_CT(core, query);
+ return;
+ }
+
+ if (query->body) write_config_binding_list(binding, query);
+ query->next_offset = offset + 1;
+ query->more = !!(DEQ_NEXT_N(exchange_list, binding) ||
DEQ_NEXT(binding->exchange));
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET-NEXT
+//
+void qdra_config_binding_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+ qdr_binding_t *binding = get_binding_at_index(core,
query->next_offset);
+ if (binding) {
+ if (query->body) write_config_binding_list(binding, query);
+ query->next_offset++;
+ query->more = !!(DEQ_NEXT_N(exchange_list, binding) ||
DEQ_NEXT(binding->exchange));
+ } else
+ query->more = false;
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// create a new iterator with its own copy of the data. caller must free
both the
+// iterator and the returned data buffer. Used for retaining data
extracted
+// from query messages.
+static qd_iterator_t *clone_iterator(qd_iterator_t *iter, unsigned char
**data)
--- End diff --
Can this function be included as a part of the iterator API instead of
being a static function ? Like qd_clone_iterator() ?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]