Author: aconway
Date: Thu Jan 19 23:08:18 2012
New Revision: 1233680
URL: http://svn.apache.org/viewvc?rev=1233680&view=rev
Log:
QPID-3603: Check for gaps in sequence numbers in qpid-receive.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp?rev=1233680&r1=1233679&r2=1233680&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp Thu Jan 19
23:08:18 2012
@@ -53,6 +53,7 @@ struct Options : public qpid::Options
bool forever;
uint messages;
bool ignoreDuplicates;
+ bool verifySequence;
bool checkRedelivered;
uint capacity;
uint ackFrequency;
@@ -76,6 +77,7 @@ struct Options : public qpid::Options
forever(false),
messages(0),
ignoreDuplicates(false),
+ verifySequence(false),
checkRedelivered(false),
capacity(1000),
ackFrequency(100),
@@ -98,6 +100,7 @@ struct Options : public qpid::Options
("forever,f", qpid::optValue(forever), "ignore timeout and wait
forever")
("messages,m", qpid::optValue(messages, "N"), "Number of messages
to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect
and ignore duplicates (by checking 'sn' header)")
+ ("verify-sequence", qpid::optValue(verifySequence), "Verify there
are no gaps in the message sequence (by checking 'sn' header)")
("check-redelivered", qpid::optValue(checkRedelivered), "Fails
with exception if a duplicate is not marked as redelivered (only relevant when
ignore-duplicates is selected)")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0
implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack
frequency (0 implies none of the messages will get accepted)")
@@ -145,22 +148,29 @@ struct Options : public qpid::Options
const string EOS("eos");
const string SN("sn");
+/** Check for duplicate or dropped messages by sequence number */
class SequenceTracker
{
- uint lastSn;
public:
- SequenceTracker() : lastSn(0) {}
+ SequenceTracker(const Options& o) : opts(o), lastSn(0) {}
- bool isDuplicate(Message& message)
- {
+ /** Return true if the message should be procesed, false if it should be
ignored. */
+ bool track(Message& message) {
uint sn = message.getProperties()[SN];
- if (lastSn < sn) {
- lastSn = sn;
- return false;
- } else {
- return true;
- }
+ bool duplicate = (sn <= lastSn);
+ bool dropped = (sn > lastSn+1);
+ if (opts.verifySequence && dropped)
+ throw Exception(QPID_MSG("Gap in sequence numbers " << lastSn <<
"-" << sn));
+ bool ignore = duplicate && opts.ignoreDuplicates;
+ if (ignore && opts.checkRedelivered && !message.getRedelivered())
+ throw qpid::Exception("duplicate sequence number received, message
not marked as redelivered!");
+ if (!duplicate) lastSn = sn;
+ return !ignore;
}
+
+ private:
+ const Options& opts;
+ uint lastSn;
};
}} // namespace qpid::tests
@@ -182,13 +192,12 @@ int main(int argc, char ** argv)
Message msg;
uint count = 0;
uint txCount = 0;
- SequenceTracker sequenceTracker;
+ SequenceTracker sequenceTracker(opts);
Duration timeout = opts.getTimeout();
bool done = false;
Reporter<ThroughputAndLatency> reporter(std::cout,
opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty())
session.createSender(opts.readyAddress).send(msg);
-
// For receive rate calculation
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
@@ -198,7 +207,7 @@ int main(int argc, char ** argv)
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
- if (!opts.ignoreDuplicates ||
!sequenceTracker.isDuplicate(msg)) {
+ if (sequenceTracker.track(msg)) {
if (msg.getContent() == EOS) {
done = true;
} else {
@@ -219,8 +228,6 @@ int main(int argc, char ** argv)
std::cout << msg.getContent() << std::endl;//TODO:
handle map or list messages
if (opts.messages && count >= opts.messages) done =
true;
}
- } else if (opts.checkRedelivered && !msg.getRedelivered()) {
- throw qpid::Exception("duplicate sequence number received,
message not marked as redelivered!");
}
if (opts.tx && (count % opts.tx == 0)) {
if (opts.rollbackFrequency && (++txCount %
opts.rollbackFrequency == 0)) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]