|
LVQ has been edited by Carl Trieloff (Dec 29, 2008). Content:Understanding LVQLast Value Queues are useful youUser Documentation are only interested in the latest value entered into a queue. LVQ semantics are typically used for things like stock symbol updates when all you care about is the latest value for example. Qpid C++ M4 or later supports two types of LVQ semantics:
LVQ semantics:LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except These two exceptions protect the consumer from missing the last update where a consumer or browser accesses a message and an update comes with the same key. [localhost tests]$ ./echotest --mode create_lvq [localhost tests]$ ./echotest --mode write Sending Data: key1=key1.0x7fffdf3f3180 Sending Data: key2=key2.0x7fffdf3f3180 Sending Data: key3=key3.0x7fffdf3f3180 Sending Data: key1=key1.0x7fffdf3f3180 Sending Data: last=last [localhost tests]$ ./echotest --mode browse Receiving Data:key1.0x7fffdf3f3180 Receiving Data:key2.0x7fffdf3f3180 Receiving Data:key3.0x7fffdf3f3180 Receiving Data:last [localhost tests]$ ./echotest --mode write Sending Data: key1=key1.0x7fffe4c7fa10 Sending Data: key2=key2.0x7fffe4c7fa10 Sending Data: key3=key3.0x7fffe4c7fa10 Sending Data: key1=key1.0x7fffe4c7fa10 Sending Data: last=last [localhost tests]$ ./echotest --mode consume Receiving Data:key1.0x7fffdf3f3180 Receiving Data:key2.0x7fffdf3f3180 Receiving Data:key3.0x7fffdf3f3180 Receiving Data:last Receiving Data:key1.0x7fffe4c7fa10 Receiving Data:key2.0x7fffe4c7fa10 Receiving Data:key3.0x7fffe4c7fa10 Receiving Data:last
#include <qpid/client/AsyncSession.h> #include <iostream> using namespace qpid::client; enum Mode Unknown macro: { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME} ;const char* modeNames[] = Unknown macro: { "create_lvq","create_lvq_no_browse","write","browse","consume" } ;
// istream/ostream ops so Options can read/display Mode. Unknown macro: {
string s;
in >> s;
int i = find(modeNames, modeNames+5, s) - modeNames;
if (i >= 5) throw Exception("Invalid mode}
ostream& operator<<(ostream& out, Mode mode) Unknown macro: {
return out << modeNames[mode];
}
struct Args : public qpid::Options, Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE) Unknown macro: {
using namespace qpid;
addOptions()
("help", optValue(help), "Print this usage statement")
("broker,b", optValue(host, "HOST"), "Broker host to connect to")
("port,p", optValue(port, "PORT"), "Broker port to connect to")
("username", optValue(username, "USER"), "user name for broker log in.")
("password", optValue(password, "PASSWORD"), "password for broker log in.")
("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay")
("mode", optValue(mode, "'see below'"), "Action mode."
"ncreate_lvq}
}; class Listener : public MessageListener Unknown macro: {
private} ;
Listener::Listener(Session& s) : void Listener::setup(bool browse) Unknown macro: {
// set queue mode
args.setOrdering(browse?LVQ_NO_BROWSE}
void Listener::browse() Unknown macro: {
subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl}
void Listener::consume() Unknown macro: {
subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl}
void Listener::send(std::string kv) Unknown macro: {
request.getDeliveryProperties().setRoutingKey(queue);
std}
void Listener::received(Message& response) cout << "Receiving Data:" << response.getData() << std::endl; Unknown macro: {
subscriptions.cancel(queue);
} */ } int main(int argc, char** argv) if (opts.help) Unknown macro: {
std}
Connection connection; switch (opts.mode) Unknown macro: {
case CONSUME}
connection.close(); Unknown macro: {
std} return 1; } |
Unsubscribe or edit your notifications preferences
