Hi Folks,
I'm fairly new to the list, about 3 working weeks, so if I get a few
terms wrong, please bear with me!
I'm doing work on adding federation support for the HeadersExchange and
the XMLExchange, Headers first, and then once that's complete, XML
Exchange will follow as its basically the same pattern.
There were basically several issues to address: (HeadersExchange)
1) adding support for dynamic bindings
2) updating the code to allow the qpid-route command to be used to
create routes between exchanges of type "headers" (support for fedOps
etc.) e.g. after creating a "link" the following cmd should work:
"qpid-route dynamic add localhost:5673 localhost:5672 amq.match"
3) updating the code to allow for the federated propagation of the
additional headers/args used in creating the binding, (x-match plus the
actual binding headers) so that messages are correctly propagated
between brokers. test case for this is producing messages to one broker,
and reading them from the other. I'd attach examples, but I understand
attachments get stripped on this list :( I do have a patch of WIP if
anyone's interested, that I can mail directly. Note: its heavy with
trace statements so definitively WIP.
4) updating the code to propagate unbind() operations
5) adding support for reOrigion when things get a bit flappy. (flappy is
a tech term :) )
I've had good success so far, and have successfully completed 1), 2) &
3) in that messages are now being federated across instances of header
exchanges in multiple brokers.
But I need to implement the unbind and reOrigion cases for completeness
before I can submit a proper patch. both of these cases require a lookup
of an existing binding and updating the details of their associated
fedBinding instances. In the headers exchange, unlike the
Direct/Topic/Fanout exchanges, there needs to be a 1:1 affinity between
a binding and a fedBinding. These other exchanges use a simple match
based on either a match pattern or a routingKey.
I've had some discussion with some folks on this already off list, but I
wanted to get some feedback from a wider audience, as I've only been in
this code for for a short time. So here goes:
The current headers Exchange uses a CopyOnWriteArray (def in
cpp/src/qpid/sys) There are several examples of using this class in the
exchange context; the DirectExchange. However, my case is slightly
different as I believe I need to be able to do a lookup for an existing
binding using more that just a simple bindingKey.
The HeadersExchange (like the XML exchange), for anyone who's not been
near that code recently, uses matching during a route() call based on
the args/headers rather than a simple key or match pattern string.
so: In the HeadersExachnge
1) during the bind() call, a bindingKey is passed in, is this key of any
value at all? i.e. is there always a 1:1 affinity between a bindingKey
and a successful binding. Can I use this key for a successful retrieval
of the associated binding. looking at the impl of route() this doesn't
appear to be the case; its not even used in that method.
2) I've not been able to find any examples of a usage of a CopyOnWrite
array where a lookup is done and an element is then updated. That is,
all the lookup's appear to be const; meaning I can't update them.
For the federated unbind case I need to be able to lookup up an element
of a CopyOnWriteArray, and update the contents of its fedBinding. So
having access to a const element isn't of much use. here's my BoundKey
and Bindings defns et. al.:
struct BoundKey
{
Binding::shared_ptr binding;
FedBinding fedBinding;
BoundKey(Binding::shared_ptr binding_) : binding(binding_) {}
};
typedef qpid::sys::CopyOnWriteArray<BoundKey> Bindings;
Bindings bindings;
So, has anyone used a CopyOnWriteArray to retrieve an element and then
update it? All the uses I've been able to find so far are read only, so
a const retval is ok in those cases.
I need to be able to do a lookup and get a non-const reference that I
can update.
I tried the following, but keep getting a bunch of const related
compiler errors:
HeadersExchange::BoundKey*
HeadersExchange::findIfExists(Queue::shared_ptr queue, const
std::string& key, const FieldTable* args)
{
BoundKey * bk = 0;
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<BoundKey>::iterator i = p->begin(); i !=
p->end(); ++i) {
if (((*i).binding->queue == queue) && ((*i).binding->key ==
key) && equal((*i).binding->args, *args)) {
bk = &(*i);
break;
}
}
}
return bk;
}
I also thought about having a fn() that return a bool and takes an inout
param of type BoundKey&. Tried that and still got const compiler issues.
If anyone can give me a hand, I'd really appreciate it!
best regards,
Sam.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]