Author: gsim
Date: Tue May 21 22:35:57 2013
New Revision: 1485001
URL: http://svn.apache.org/r1485001
Log:
QPID-4591: patch from Ernie Allen to add queue sequence number to messages
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue May 21 22:35:57 2013
@@ -755,6 +755,7 @@ void Queue::push(Message& message, bool
{
Mutex::ScopedLock locker(messageLock);
message.setSequence(++sequence);
+ if (settings.sequencing) message.addAnnotation(settings.sequenceKey,
(uint32_t)sequence);
messages->publish(message);
listeners.populate(copy);
observeEnqueue(message, locker);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Tue May 21 22:35:57
2013
@@ -63,6 +63,7 @@ const std::string LVQ_LEGACY("qpid.last_
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
+const std::string SEQUENCING("qpid.queue_msg_sequence");
bool handleFairshareSetting(const std::string& basename, const std::string&
key, const qpid::types::Variant& value, QueueSettings& settings)
{
@@ -97,7 +98,8 @@ QueueSettings::QueueSettings(bool d, boo
noLocal(false),
isBrowseOnly(false),
autoDeleteDelay(0),
- alertRepeatInterval(60)
+ alertRepeatInterval(60),
+ sequencing(false)
{}
bool QueueSettings::handle(const std::string& key, const qpid::types::Variant&
value)
@@ -203,6 +205,10 @@ bool QueueSettings::handle(const std::st
} else if (key == PAGE_FACTOR) {
pageFactor = value;
return true;
+ } else if (key == SEQUENCING) {
+ sequenceKey = value.getString();
+ sequencing = !sequenceKey.empty();
+ return true;
} else if (key == FILTER) {
filter = value.asString();
return true;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Tue May 21 22:35:57 2013
@@ -83,6 +83,10 @@ struct QueueSettings
uint64_t maxFileSize;
uint64_t maxFileCount;
+ std::string sequenceKey;
+ // store bool to avoid testing string value
+ bool sequencing;
+
std::string filter;
//yuck, yuck
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1485001&r1=1485000&r2=1485001&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Tue May 21
22:35:57 2013
@@ -123,3 +123,83 @@ class GeneralTests(Base):
self.assertEqual(rx_alt.available(), 0, "No further messages should be
received via the alternate exchange")
sess4.close()
+
+class SequenceNumberTests(Base):
+ """
+ Tests of ring queue sequence number
+ """
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self, name="ring-sequence-queue",
key="qpid.queue_msg_sequence"):
+ addr = "%s; {create:sender, delete:always, node: {x-declare:
{arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring',
'qpid.max_count':4}}}}" % (name, key)
+ sender = self.ssn.sender(addr)
+ return sender
+
+ def test_create_sequence_queue(self):
+ """
+ Test a queue with sequencing can be created
+ """
+
+ #setup, declare a queue
+ try:
+ sender = self.setup_sender()
+ except:
+ self.fail("Unable to create ring queue with sequencing enabled")
+
+ def test_get_sequence_number(self):
+ """
+ Test retrieving sequence number for queues
+ """
+
+ key = "k"
+ sender = self.setup_sender("ring-sequence-queue2", key=key)
+
+ # send and receive 1 message and test the sequence number
+ msg = Message()
+ sender.send(msg)
+
+ receiver = self.ssn.receiver("ring-sequence-queue2")
+ msg = receiver.fetch(1)
+ try:
+ seqNo = msg.properties[key]
+ if int(seqNo) != 1:
+ txt = "Unexpected sequence number. Should be 1. Received (%s)"
% seqNo
+ self.fail(txt)
+ except:
+ txt = "Unable to get key (%s) from message properties" % key
+ self.fail(txt)
+ receiver.close()
+
+ def test_sequence_number_gap(self):
+ """
+ Test that sequence number for ring queues shows gaps when queue
+ messages are overwritten
+ """
+ key = "qpid.seq"
+ sender = self.setup_sender("ring-sequence-queue3", key=key)
+ receiver = self.ssn.receiver("ring-sequence-queue3")
+
+ msg = Message()
+ sender.send(msg)
+ msg = receiver.fetch(1)
+
+ # send 5 more messages to overflow the queue
+ for i in range(5):
+ sender.send(msg)
+
+ msg = receiver.fetch(1)
+ seqNo = msg.properties[key]
+ if int(seqNo) != 3:
+ txt = "Unexpected sequence number. Should be 3. Received (%s)" %
seqNo
+ self.fail(txt)
+ receiver.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]