Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Oct 7 14:21:48 2011 @@ -56,12 +56,12 @@ class TestConsumer : public virtual Cons public: typedef boost::shared_ptr<TestConsumer> shared_ptr; - intrusive_ptr<Message> last; + QueuedMessage last; bool received; - TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ - last = msg.payload; + last = msg; received = true; return true; }; @@ -149,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){ queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); //Test cancellation: queue->cancel(c1); @@ -214,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ if (!consumer->received) sleep(2); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; @@ -298,14 +298,14 @@ QPID_AUTO_TEST_CASE(testSeek){ queue->deliver(msg2); queue->deliver(msg3); - TestConsumer::shared_ptr consumer(new TestConsumer(false)); + TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); consumer->position = seq; QueuedMessage qm; queue->dispatch(consumer); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe @@ -325,14 +325,18 @@ QPID_AUTO_TEST_CASE(testSearch){ queue->deliver(msg3); SequenceNumber seq(2); - QueuedMessage qm = queue->find(seq); + QueuedMessage qm; + TestConsumer::shared_ptr c1(new TestConsumer()); + + BOOST_CHECK(queue->find(seq, qm)); BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - queue->acquire(qm); + queue->acquire(qm, c1->getName()); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); - QueuedMessage qm1 = queue->find(seq1); + QueuedMessage qm1; + BOOST_CHECK(queue->find(seq1, qm1)); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); } @@ -552,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ QueuedMessage qmsg2(queue.get(), msg2, ++sequence); framing::SequenceNumber sequence1(10); QueuedMessage qmsg3(queue.get(), 0, sequence1); + TestConsumer::shared_ptr dummy(new TestConsumer()); - BOOST_CHECK(!queue->acquire(qmsg)); - BOOST_CHECK(queue->acquire(qmsg2)); + BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); + BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); // Acquire the massage again to test failure case. - BOOST_CHECK(!queue->acquire(qmsg2)); - BOOST_CHECK(!queue->acquire(qmsg3)); + BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); + BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -567,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); - TestConsumer::shared_ptr c1(new TestConsumer(false)); + TestConsumer::shared_ptr c1(new TestConsumer("test", false)); queue->dispatch(c1); queue->dispatch(c1); @@ -696,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } + +namespace { + // helper for group tests + void verifyAcquire( Queue::shared_ptr queue, + TestConsumer::shared_ptr c, + std::deque<QueuedMessage>& results, + const std::string& expectedGroup, + const int expectedId ) + { + queue->dispatch(c); + results.push_back(c->last); + std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( group, expectedGroup ); + BOOST_CHECK_EQUAL( id, expectedId ); + } +} + +QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { + // + // Verify that consumers of grouped messages own the groups once a message is acquired, + // and release the groups once all acquired messages have been dequeued or requeued + // + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); + queue->configure(args); + + std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), + std::string("b"), std::string("b"), std::string("b"), + std::string("c"), std::string("c"), std::string("c") }; + for (int i = 0; i < 9; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", groups[i]); + msg->insertCustomProperty("MY-ID", i); + queue->deliver(msg); + } + + // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, + + BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + + verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) + verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) + + // now let c1 complete the 'a-0' message - this should free the 'a' group + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- + + // now c2 should pick up the next 'a-1', since it is oldest free + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- + + // c1 should only be able to snarf up the first "c" message now... + verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 + + // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) + queue->dequeue( 0, dequeMeC2.front() ); + dequeMeC2.pop_front(); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 + + // b group is free, c is owned by c1 - c1's next get should grab 'b-4' + verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 + + // c2 can now only grab a-2, and that's all + verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); + + // now C2 can't get any more, since C1 owns "b" and "c" group... + bool gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- + + // c2 can now grab c-7 + verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 + + // what happens if C-2 "requeues" a-1 and a-2? + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); // now just has c-7 acquired + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 + + // now c1 will grab a-1 and a-2... + verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); + verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 + + // c2 can now acquire c-8 only + verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); + + // and c1 can get b-5 + verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); + + // should be no more acquire-able for anyone now: + gotOne = queue->dispatch(c1); + BOOST_CHECK( !gotOne ); + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + // requeue all of C1's acquired messages, then cancel C1 + while (!dequeMeC1.empty()) { + queue->requeue(dequeMeC1.front()); + dequeMeC1.pop_front(); + } + queue->cancel(c1); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ---, ---, ^C2, ^C2 + + // b-4, a-1, a-2, b-5 all should be available, right? + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); + + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, b-4, b-5 + // Owners= ---, ---, --- + + TestConsumer::shared_ptr c3(new TestConsumer("C3")); + std::deque<QueuedMessage> dequeMeC3; + + verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); + verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); + + // Queue = a-2, b-4, b-5 + // Owners= ^C3, ^C2, ^C2 + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); + + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, + // Owners= ^C3, + + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", "a"); + msg->insertCustomProperty("MY-ID", 9); + queue->deliver(msg); + + // Queue = a-2, a-9 + // Owners= ^C3, ^C3 + + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", "b"); + msg->insertCustomProperty("MY-ID", 10); + queue->deliver(msg); + + // Queue = a-2, a-9, b-10 + // Owners= ^C3, ^C3, ---- + + verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); + verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + queue->cancel(c2); + queue->cancel(c3); +} + + +QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { + // + // Verify that the same default group name is automatically applied to messages that + // do not specify a group name. + // + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); + queue->configure(args); + + for (int i = 0; i < 3; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + // no "GROUP-ID" header + msg->insertCustomProperty("MY-ID", i); + queue->deliver(msg); + } + + // Queue = 0, 1, 2 + + BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + queue->dispatch(c1); // c1 now owns default group (acquired 0) + dequeMeC1.push_back(c1->last); + int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 0 ); + + bool gotOne = queue->dispatch(c2); // c2 should get nothing + BOOST_CHECK( !gotOne ); + + queue->dispatch(c1); // c1 now acquires 1 + dequeMeC1.push_back(c1->last); + id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 1 ); + + gotOne = queue->dispatch(c2); // c2 should still get nothing + BOOST_CHECK( !gotOne ); + + while (!dequeMeC1.empty()) { + queue->dequeue(0, dequeMeC1.front()); + dequeMeC1.pop_front(); + } + + // now default group should be available... + queue->dispatch(c2); // c2 now owns default group (acquired 2) + id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 2 ); + + gotOne = queue->dispatch(c1); // c1 should get nothing + BOOST_CHECK( !gotOne ); + + queue->cancel(c1); + queue->cancel(c2); +} + QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ TestMessageStoreOC testStore;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Oct 7 14:21:48 2011 @@ -1418,6 +1418,76 @@ class LongTests(BrokerTest): if receiver: receiver.connection.detach() logger.setLevel(log_level) + def test_msg_group_failover(self): + """Test fail-over during continuous send-receive of grouped messages. + """ + + class GroupedTrafficGenerator(Thread): + def __init__(self, url, queue, group_key): + Thread.__init__(self) + self.url = url + self.queue = queue + self.group_key = group_key + self.status = -1 + + def run(self): + # generate traffic for approx 10 seconds (2011msgs / 200 per-sec) + cmd = ["msg_group_test", + "--broker=%s" % self.url, + "--address=%s" % self.queue, + "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS), + "--group-key=%s" % self.group_key, + "--receivers=2", + "--senders=3", + "--messages=2011", + "--send-rate=200", + "--capacity=11", + "--ack-frequency=23", + "--allow-duplicates", + "--group-size=37", + "--randomize-group-size", + "--interleave=13"] + # "--trace"] + self.generator = Popen( cmd ); + self.status = self.generator.wait() + return self.status + + def results(self): + self.join(timeout=30) # 3x assumed duration + if self.isAlive(): return -1 + return self.status + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"]) + for b in cluster: b.ready() # Wait for brokers to be ready + + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}" + s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args) + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration(); + i = 0 + while time.time() < endtime: + traffic = GroupedTrafficGenerator( cluster[i].host_port(), + "test-group-q", "group-id" ) + traffic.start() + time.sleep(1) + + for x in range(2): + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + time.sleep(1) + + # wait for traffic to finish, verify success + self.assertEqual(0, traffic.results()) + + for i in range(i, len(cluster)): cluster[i].kill() + + class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Oct 7 14:21:48 2011 @@ -1 +1,2 @@ /qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py:1061302-1072333 +/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py:1144319-1179855 Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Fri Oct 7 14:21:48 2011 @@ -28,6 +28,7 @@ #include <qpid/messaging/FailoverUpdates.h> #include <qpid/sys/Time.h> #include <qpid/sys/Monitor.h> +#include <qpid/sys/SystemInfo.h> #include "TestOptions.h" #include "Statistics.h" @@ -76,6 +77,11 @@ struct Options : public qpid::Options uint flowControl; bool sequence; bool timestamp; + std::string groupKey; + std::string groupPrefix; + uint groupSize; + bool groupRandSize; + uint groupInterleave; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -100,7 +106,11 @@ struct Options : public qpid::Options sendRate(0), flowControl(0), sequence(true), - timestamp(true) + timestamp(true), + groupPrefix("GROUP-"), + groupSize(10), + groupRandSize(false), + groupInterleave(1) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -111,8 +121,8 @@ struct Options : public qpid::Options ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") - ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") - ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") + ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -131,6 +141,11 @@ struct Options : public qpid::Options ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") + ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier") + ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)") + ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)") + ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)") + ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -252,6 +267,68 @@ class MapContentGenerator : public Con const Options& opts; }; +// tag each generated message with a group identifer +// +class GroupGenerator { +public: + GroupGenerator(const std::string& key, + const std::string& prefix, + const uint size, + const bool randomize, + const uint interleave) + : groupKey(key), groupPrefix(prefix), groupSize(size), + randomizeSize(randomize), groupSuffix(0) + { + if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); + + for (uint i = 0; i < 1 || i < interleave; ++i) { + newGroup(); + } + current = groups.begin(); + } + + void setGroupInfo(Message &msg) + { + if (current == groups.end()) + current = groups.begin(); + msg.getProperties()[groupKey] = current->id; + // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl; + if (++(current->count) == current->size) { + newGroup(); + groups.erase(current++); + } else + ++current; + } + + private: + const std::string& groupKey; + const std::string& groupPrefix; + const uint groupSize; + const bool randomizeSize; + + uint groupSuffix; + + struct GroupState { + std::string id; + const uint size; + uint count; + GroupState( const std::string& i, const uint s ) + : id(i), size(s), count(0) {} + }; + typedef std::list<GroupState> GroupList; + GroupList groups; + GroupList::iterator current; + + void newGroup() { + std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); + groupId << groupSuffix++; + uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize; + // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl; + GroupState group( groupId.str(), size ); + groups.push_back( group ); + } +}; + int main(int argc, char ** argv) { Connection connection; @@ -296,6 +373,14 @@ int main(int argc, char ** argv) else contentGen.reset(new FixedContentGenerator(opts.contentString)); + std::auto_ptr<GroupGenerator> groupGen; + if (!opts.groupKey.empty()) + groupGen.reset(new GroupGenerator(opts.groupKey, + opts.groupPrefix, + opts.groupSize, + opts.groupRandSize, + opts.groupInterleave)); + qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; @@ -312,9 +397,6 @@ int main(int argc, char ** argv) ++sent; if (opts.sequence) msg.getProperties()[SN] = sent; - if (opts.timestamp) - msg.getProperties()[TS] = int64_t( - qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); if (opts.flowControl) { if ((sent % opts.flowControl) == 0) { msg.setReplyTo(flowControlAddress); @@ -323,6 +405,12 @@ int main(int argc, char ** argv) else msg.setReplyTo(Address()); // Clear the reply address. } + if (groupGen.get()) + groupGen->setGroupInfo(msg); + + if (opts.timestamp) + msg.getProperties()[TS] = int64_t( + qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); sender.send(msg); reporter.message(msg); Modified: qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml (original) +++ qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml Fri Oct 7 14:21:48 2011 @@ -59,6 +59,7 @@ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="AMQP-Compatibility.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Qpid-Interoperability-Documentation.xml"/> + <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/> </chapter> Modified: qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml (original) +++ qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml Fri Oct 7 14:21:48 2011 @@ -52,6 +52,7 @@ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Starting-a-cluster.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="ACL.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/> + <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/> </chapter> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Oct 7 14:21:48 2011 @@ -694,7 +694,8 @@ public class QMFService implements Confi public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory, final String srcQueue, final String destQueue, - final Long qty) + final Long qty, + final Map filter) // TODO: move based on group identifier { // TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); @@ -731,6 +732,14 @@ public class QMFService implements Confi return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory, + final String type, + final String name) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public UUID getId() { return _obj.getId(); @@ -1102,7 +1111,8 @@ public class QMFService implements Confi } public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, - final Long request) + final Long request, + final Map filter) // TODO: support for purge-by-group-identifier { try { @@ -1118,7 +1128,8 @@ public class QMFService implements Confi public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory, final Long request, final Boolean useAltExchange, - final String exchange) + final String exchange, + final Map filter) // TODO: support for re-route-by-group-identifier { //TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); Modified: qpid/trunk/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/specs/management-schema.xml (original) +++ qpid/trunk/qpid/specs/management-schema.xml Fri Oct 7 14:21:48 2011 @@ -92,6 +92,7 @@ <arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/> <arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/> <arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/> </method> <method name="setLogLevel" desc="Set the log level"> @@ -115,6 +116,13 @@ <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> </method> + <method name="query" desc="Query the current state of an object."> + <arg name="type" dir="I" type="sstr" desc="The type of object to query."/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> + <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/> + </method> + + </class> <!-- @@ -180,12 +188,14 @@ <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/> </method> <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> <arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/> <arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/> </method> </class> Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Oct 7 14:21:48 2011 @@ -33,3 +33,4 @@ from lvq import * from priority import * from threshold import * from extensions import * +from msg_groups import * Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Oct 7 14:21:48 2011 @@ -156,7 +156,7 @@ class ManagementTest (TestBase010): queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {}) self.assertEqual (result.status, 0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -166,7 +166,7 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {}) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -176,16 +176,16 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {}) self.assertEqual (result.status,4) "Use a bad destination queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {}) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {}) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -225,19 +225,19 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" - result = pq.purge(1) + result = pq.purge(1, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" - result = pq.purge(9) + result = pq.purge(9, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" - result = pq.purge(0) + result = pq.purge(0, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) @@ -263,7 +263,7 @@ class ManagementTest (TestBase010): #reroute messages from test queue to amq.fanout (and hence to #rerouted queue): pq = self.qmf.getObjects(_class="queue", name="test-queue")[0] - result = pq.reroute(0, False, "amq.fanout") + result = pq.reroute(0, False, "amq.fanout", {}) self.assertEqual(result.status, 0) #verify messages are all rerouted: @@ -301,7 +301,7 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] "Reroute top message from reroute-queue to alternate exchange" - result = pq.reroute(1, True, "") + result = pq.reroute(1, True, "", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] @@ -309,7 +309,7 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,1) "Reroute top 9 messages from reroute-queue to alt.direct2" - result = pq.reroute(9, False, "alt.direct2") + result = pq.reroute(9, False, "alt.direct2", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -317,11 +317,11 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,9) "Reroute using a non-existent exchange" - result = pq.reroute(0, False, "amq.nosuchexchange") + result = pq.reroute(0, False, "amq.nosuchexchange", {}) self.assertEqual(result.status, 4) "Reroute all messages from reroute-queue" - result = pq.reroute(0, False, "alt.direct2") + result = pq.reroute(0, False, "alt.direct2", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -337,7 +337,7 @@ class ManagementTest (TestBase010): session.message_transfer(destination="amq.direct", message=msg) "Reroute onto the same queue" - result = pq.reroute(0, False, "amq.direct") + result = pq.reroute(0, False, "amq.direct", {}) self.assertEqual(result.status, 0) pq.update() self.assertEqual(pq.msgDepth,20) @@ -365,7 +365,7 @@ class ManagementTest (TestBase010): # 4. Call reroute on queue Y and specify that messages should # be sent to exchange A y = self.qmf.getObjects(_class="queue", name="Y")[0] - result = y.reroute(1, False, "A") + result = y.reroute(1, False, "A", {}) self.assertEqual(result.status, 0) # 5. verify that the message is rerouted through B (as A has Modified: qpid/trunk/qpid/tools/src/py/qpid-config URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=1180050&r1=1180049&r2=1180050&view=diff ============================================================================== --- qpid/trunk/qpid/tools/src/py/qpid-config (original) +++ qpid/trunk/qpid/tools/src/py/qpid-config Fri Oct 7 14:21:48 2011 @@ -96,6 +96,8 @@ class Config: self._flowResumeCount = None self._flowStopSize = None self._flowResumeSize = None + self._msgGroupHeader = None + self._sharedMsgGroup = False self._extra_arguments = [] self._returnCode = 0 @@ -116,13 +118,16 @@ FLOW_STOP_COUNT = "qpid.flow_stop_coun FLOW_RESUME_COUNT = "qpid.flow_resume_count" FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" +MSG_GROUP_HDR_KEY = "qpid.group_header_key" +SHARED_MSG_GROUP = "qpid.shared_msg_group" #There are various arguments to declare that have specific program #options in this utility. However there is now a generic mechanism for #passing arguments as well. The SPECIAL_ARGS list contains the #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE] +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -182,6 +187,10 @@ def OptionsAndArguments(argv): help="Turn on sender flow control when the number of queued messages exceeds this value.") group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", help="Turn off sender flow control when the number of queued messages drops below this value.") + group3.add_option("--group-header", action="store", type="string", metavar="<header-name>", + help="Enable message groups. Specify name of header that holds group identifier.") + group3.add_option("--shared-groups", action="store_true", + help="Allow message group consumption across multiple consumers.") group3.add_option("--argument", dest="extra_arguments", action="append", default=[], metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") # no option for declaring an exclusive queue - which can only be used by the session that creates it. @@ -263,6 +272,10 @@ def OptionsAndArguments(argv): config._flowStopCount = opts.flow_stop_count if opts.flow_resume_count: config._flowResumeCount = opts.flow_resume_count + if opts.group_header: + config._msgGroupHeader = opts.group_header + if opts.shared_groups: + config._sharedMsgGroup = True if opts.extra_arguments: config._extra_arguments = opts.extra_arguments return args @@ -442,6 +455,8 @@ class BrokerManager: if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT], + if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY], + if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups", print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): @@ -534,6 +549,11 @@ class BrokerManager: if config._flowResumeCount: declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount + if config._msgGroupHeader: + declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader + if config._sharedMsgGroup: + declArgs[SHARED_MSG_GROUP] = 1 + if config._altern_ex != None: self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
