LVQ has been edited by Carl Trieloff (Dec 29, 2008).

(View changes)

Content:

Understanding LVQ

Last Value Queues are useful youUser Documentation are only interested in the latest value entered into a queue. LVQ semantics are typically used for things like stock symbol updates when all you care about is the latest value for example.

Qpid C++ M4 or later supports two types of LVQ semantics:

  • LVQ
  • LVQ_NO_BROWSE

LVQ semantics:

LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except
a.) if the message with the matching key has been acquired
b.) if the message with the matching key has been browsed
In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced

These two exceptions protect the consumer from missing the last update where a consumer or browser accesses a message and an update comes with the same key.

[localhost tests]$ ./echotest --mode create_lvq
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fffdf3f3180
Sending Data: key2=key2.0x7fffdf3f3180
Sending Data: key3=key3.0x7fffdf3f3180
Sending Data: key1=key1.0x7fffdf3f3180
Sending Data: last=last
[localhost tests]$ ./echotest --mode browse
Receiving Data:key1.0x7fffdf3f3180
Receiving Data:key2.0x7fffdf3f3180
Receiving Data:key3.0x7fffdf3f3180
Receiving Data:last
[localhost tests]$ ./echotest --mode write
Sending Data: key1=key1.0x7fffe4c7fa10
Sending Data: key2=key2.0x7fffe4c7fa10
Sending Data: key3=key3.0x7fffe4c7fa10
Sending Data: key1=key1.0x7fffe4c7fa10
Sending Data: last=last
[localhost tests]$ ./echotest --mode consume
Receiving Data:key1.0x7fffdf3f3180
Receiving Data:key2.0x7fffdf3f3180
Receiving Data:key3.0x7fffdf3f3180
Receiving Data:last
Receiving Data:key1.0x7fffe4c7fa10
Receiving Data:key2.0x7fffe4c7fa10
Receiving Data:key3.0x7fffe4c7fa10
Receiving Data:last

An example

h2. LVQ_NO_BROWSE semantics:

LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except
a.) if the message with the matching key has been acquired
In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced

Note, in this case browsed messaged are not invalidated, so updates can be missed.

An example

localhost tests$ ./echotest --mode create_lvq_no_browse
localhost tests$ ./echotest --mode write
Sending Data: key1=key1.0x7fffce5fb390
Sending Data: key2=key2.0x7fffce5fb390
Sending Data: key3=key3.0x7fffce5fb390
Sending Data: key1=key1.0x7fffce5fb390
Sending Data: last=last
localhost tests$ ./echotest --mode write
Sending Data: key1=key1.0x7fff346ae440
Sending Data: key2=key2.0x7fff346ae440
Sending Data: key3=key3.0x7fff346ae440
Sending Data: key1=key1.0x7fff346ae440
Sending Data: last=last
localhost tests$ ./echotest --mode browse
Receiving Data:key1.0x7fff346ae440
Receiving Data:key2.0x7fff346ae440
Receiving Data:key3.0x7fff346ae440
Receiving Data:last
localhost tests$ ./echotest --mode browse
Receiving Data:key1.0x7fff346ae440
Receiving Data:key2.0x7fff346ae440
Receiving Data:key3.0x7fff346ae440
Receiving Data:last
localhost tests$ ./echotest --mode write
Sending Data: key1=key1.0x7fff606583e0
Sending Data: key2=key2.0x7fff606583e0
Sending Data: key3=key3.0x7fff606583e0
Sending Data: key1=key1.0x7fff606583e0
Sending Data: last=last
localhost tests$ ./echotest --mode consume
Receiving Data:key1.0x7fff606583e0
Receiving Data:key2.0x7fff606583e0
Receiving Data:key3.0x7fff606583e0
Receiving Data:last
localhost tests$

h2. Example source

/*
*

  • Licensed to the Apache Software Foundation (ASF) under one
  • or more contributor license agreements. See the NOTICE file
  • distributed with this work for additional information
  • regarding copyright ownership. The ASF licenses this file
  • to you under the Apache License, Version 2.0 (the
  • "License"); you may not use this file except in compliance
  • with the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
  • Unless required by applicable law or agreed to in writing,
  • software distributed under the License is distributed on an
  • "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  • KIND, either express or implied. See the License for the
  • specific language governing permissions and limitations
  • under the License.
    *
    */

#include <qpid/client/AsyncSession.h>
#include <qpid/client/Connection.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/QueueOptions.h>

#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid;
using namespace std;

enum Mode

Unknown macro: { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME}
;
const char* modeNames[] =
Unknown macro: { "create_lvq","create_lvq_no_browse","write","browse","consume" }
;

// istream/ostream ops so Options can read/display Mode.
istream& operator>>(istream& in, Mode& mode)

Unknown macro: { string s; in >> s; int i = find(modeNames, modeNames+5, s) - modeNames; if (i >= 5) throw Exception("Invalid mode}

ostream& operator<<(ostream& out, Mode mode)

Unknown macro: { return out << modeNames[mode]; }

struct Args : public qpid::Options,
public qpid::client::ConnectionSettings
{
bool help;
Mode mode;

Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE)

Unknown macro: { using namespace qpid; addOptions() ("help", optValue(help), "Print this usage statement") ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay") ("mode", optValue(mode, "'see below'"), "Action mode." "ncreate_lvq}

};

class Listener : public MessageListener

Unknown macro: { private}
;

Listener::Listener(Session& s) :
session(s), subscriptions(s),
queue("LVQtester")
{}

void Listener::setup(bool browse)

Unknown macro: { // set queue mode args.setOrdering(browse?LVQ_NO_BROWSE}

void Listener::browse()

Unknown macro: { subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl}

void Listener::consume()

Unknown macro: { subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl}

void Listener::send(std::string kv)

Unknown macro: { request.getDeliveryProperties().setRoutingKey(queue); std}

void Listener::received(Message& response)
{

cout << "Receiving Data:" << response.getData() << std::endl;
/* if (response.getData() == "last")

Unknown macro: { subscriptions.cancel(queue); }

*/
}

int main(int argc, char** argv)
{
Args opts;
opts.parse(argc, argv);

if (opts.help)

Unknown macro: { std}

Connection connection;
try {
connection.open(opts);
Session session = connection.newSession();
Listener listener(session);

switch (opts.mode)

Unknown macro: { case CONSUME}

connection.close();
return 0;
} catch(const std::exception& error)

Unknown macro: { std}

return 1;
}


Reply via email to