Author: aconway
Date: Thu Jan 19 23:05:24 2012
New Revision: 1233658
URL: http://svn.apache.org/viewvc?rev=1233658&view=rev
Log:
QPID-3603: Set bridge sync parameter to 1.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1233658&r1=1233657&r2=1233658&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu
Jan 19 23:05:24 2012
@@ -116,7 +116,7 @@ bool DeliveryRecord::accept(TransactionC
if (acquired) {
queue->dequeue(ctxt, msg);
} else if (isDelayedCompletion) {
- //TODO: this is a nasty way to do this; change it
+ // FIXME aconway 2011-12-05: This should be done in HA code.
msg.payload->getIngressCompletion().finishCompleter();
QPID_LOG(debug, "Completed " << msg.payload.get());
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233658&r1=1233657&r2=1233658&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:05:24 2012
@@ -36,6 +36,7 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
}
namespace qpid {
@@ -50,6 +51,7 @@ QueueReplicator::QueueReplicator(boost::
{
// FIXME aconway 2011-11-24: consistent logging.
QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " <<
q->getSettings());
+ // Declare the replicator bridge.
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -77,11 +79,11 @@ void QueueReplicator::initializeBridge(B
framing::FieldTable settings;
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
queue->getPosition());
+ settings.setInt(QPID_SYNC_FREQUENCY, 1);
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER,
oldest);
-
- peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1,
0, false, "", 0, settings);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/,
0/*acquire-pre-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src <<
" to " << args.i_dest);
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233658&r1=1233657&r2=1233658&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:05:24 2012
@@ -169,12 +169,12 @@ void ReplicatingSubscription::enqueued(c
// Called with lock held.
void ReplicatingSubscription::generateDequeueEvent()
{
+ QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << "
" << range);
string buf(range.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
range.encode(buffer);
range.clear();
buffer.reset();
-
//generate event message
boost::intrusive_ptr<Message> event = new Message();
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233658&r1=1233657&r2=1233658&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19
23:05:24 2012
@@ -106,6 +106,18 @@ class ShortTests(BrokerTest):
verify(b, "1", p)
verify(b, "2", p)
+ # Test a series of messages, enqueue and dequeue.
+ s = p.sender(queue("foo","all"))
+ msgs = [str(i) for i in range(10)]
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(b, "foo", msgs)
+ self.assert_browse_retry(p, "foo", msgs)
+ r = p.receiver("foo")
+ for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", [])
+ self.assert_browse_retry(b, "foo", [])
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] +
sys.argv[1:])
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]