Author: kgiusti
Date: Fri Nov  4 17:39:49 2011
New Revision: 1197686

URL: http://svn.apache.org/viewvc?rev=1197686&view=rev
Log:
QPID-3564: enhance message group generator to allow queue fill/drain tests

Modified:
    qpid/trunk/qpid/cpp/src/tests/msg_group_test.cpp
    qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests
    qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests_soak

Modified: qpid/trunk/qpid/cpp/src/tests/msg_group_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/msg_group_test.cpp?rev=1197686&r1=1197685&r2=1197686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/msg_group_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/msg_group_test.cpp Fri Nov  4 17:39:49 2011
@@ -126,6 +126,8 @@ struct Options : public qpid::Options
         try {
             qpid::Options::parse(argc, argv);
             if (address.empty()) throw qpid::Exception("Address must be 
specified!");
+            if (senders == 0 && receivers == 0) throw qpid::Exception("No 
senders and No receivers?");
+            if (messages == 0) throw qpid::Exception("The message count cannot 
be zero.");
             qpid::log::Logger::instance().configure(log);
             if (help) {
                 std::ostringstream msg;
@@ -152,7 +154,9 @@ class GroupChecker
 {
     qpid::sys::Mutex lock;
 
-    const uint totalMsgs;
+    uint consumerCt;
+    uint producerCt;
+    uint totalMsgs;
     uint totalMsgsConsumed;
     uint totalMsgsPublished;
     bool allowDuplicates;
@@ -169,9 +173,18 @@ class GroupChecker
 
 public:
 
-    GroupChecker( uint t, bool d ) :
-        totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), 
allowDuplicates(d),
-        duplicateMsgs(0) {}
+    GroupChecker( uint messages, uint consumers, uint producers, bool d) :
+        consumerCt(consumers), producerCt(producers),
+        totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), 
allowDuplicates(d),
+        duplicateMsgs(0)
+    {
+        // if consumering only - we a draining a queue of 'messages' queued 
messages.
+        if (producerCt != 0) {
+            totalMsgs = producers * messages;
+        } else {
+            totalMsgs = messages;
+        }
+    }
 
     bool checkSequence( const std::string& groupId,
                         uint sequence, const std::string& client )
@@ -227,12 +240,22 @@ public:
         return sequenceMap[groupId];
     }
 
-    bool allMsgsConsumed()  // true when done processing msgs
+    bool allMsgsPublished() // true when done publishing msgs
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        return (producerCt == 0 || totalMsgsPublished >= totalMsgs);
+    }
+
+    bool allMsgsConsumed()  // true when done consuming msgs
     {
         qpid::sys::Mutex::ScopedLock l(lock);
-        return (totalMsgsPublished >= totalMsgs) &&
-          (totalMsgsConsumed >= totalMsgsPublished) &&
-          sequenceMap.size() == 0;
+        return (consumerCt == 0 ||
+                (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0));
+    }
+
+    uint getTotalMessages()
+    {
+        return totalMsgs;
     }
 
     uint getConsumedTotal()
@@ -533,7 +556,9 @@ int main(int argc, char ** argv)
         Options opts;
         if (opts.parse(argc, argv)) {
 
-            GroupChecker state( opts.senders * opts.messages,
+            GroupChecker state( opts.messages,
+                                opts.receivers,
+                                opts.senders,
                                 opts.allowDuplicates);
             std::vector<Client::shared_ptr> clients;
 
@@ -555,48 +580,47 @@ int main(int argc, char ** argv)
 
             // wait for all pubs/subs to finish.... or for consumers to fail 
or stall.
             uint stalledTime = 0;
-            bool done;
             bool clientFailed = false;
-            do {
-                uint lastCount = state.getConsumedTotal();
+            while (!clientFailed && (!state.allMsgsPublished() || 
!state.allMsgsConsumed())) {
+                uint lastCount;
+
+                lastCount = state.getConsumedTotal();
                 qpid::sys::usleep( 1000000 );
 
-                // check each client for status
-                done = true;
+                // check each client for failures
                 for (std::vector<Client::shared_ptr>::iterator i = 
clients.begin();
                      i != clients.end(); ++i) {
                     QPID_LOG(debug, "Client " << (*i)->getName() << " state=" 
<< (*i)->getState());
                     if ((*i)->getState() == Client::FAILURE) {
                         QPID_LOG(error, argv[0] << ": test failed with client 
error: " << (*i)->getErrorMsg());
                         clientFailed = true;
-                        done = true;
                         break;  // exit test.
-                    } else if ((*i)->getState() != Client::DONE) {
-                        done = false;
                     }
                 }
 
-                if (!done) {
-                    // check that consumers are still receiving messages
-                    if (lastCount == state.getConsumedTotal())
-                        stalledTime++;
-                    else {
-                        lastCount = state.getConsumedTotal();
+                // check for stalled consumers
+                if (!clientFailed && !state.allMsgsConsumed()) {
+                    if (lastCount == state.getConsumedTotal()) {
+                        if (++stalledTime >= opts.timeout) {
+                            clientFailed = true;
+                            break;  // exit test
+                        }
+                    } else {
                         stalledTime = 0;
                     }
                 }
-
                 QPID_LOG(debug, "Consumed to date = " << 
state.getConsumedTotal() <<
                          " Published to date = " << state.getPublishedTotal() 
<<
-                         " total=" << opts.senders * opts.messages );
-
-            } while (!done && stalledTime < opts.timeout);
+                         " total=" << state.getTotalMessages());
+            }
 
             if (clientFailed) {
-                status = 1;
-            } else if (stalledTime >= opts.timeout) {
-                QPID_LOG(error, argv[0] << ": test failed due to stalled 
consumer." );
-                status = 2;
+                if (stalledTime >= opts.timeout) {
+                    QPID_LOG(error, argv[0] << ": test failed due to stalled 
consumer." );
+                    status = 2;
+                } else {
+                    status = 1;
+                }
             }
 
             // Wait for started threads.

Modified: qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests?rev=1197686&r1=1197685&r2=1197686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests Fri Nov  4 17:39:49 2011
@@ -54,6 +54,8 @@ tests=("qpid-config -a $BROKER_URL add q
     "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 59  --group-size 3  --receivers 2 --senders 3 --capacity 1 
--ack-frequency 1 --randomize-group-size"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 
--ack-frequency 79 --interleave 53"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 10000  --group-size 1 --receivers 0 --senders 1"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 10000  --receivers 5 --senders 0"
     "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
 
 while [ -n "${tests[i]}" ]; do

Modified: qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests_soak
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests_soak?rev=1197686&r1=1197685&r2=1197686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_msg_group_tests_soak Fri Nov  4 17:39:49 
2011
@@ -48,6 +48,8 @@ tests=("qpid-config -a $BROKER_URL add q
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 10007 --receivers 3 --senders 5 --group-size 211 
--randomize-group-size --capacity 47 --ack-frequency 97"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 10007 --receivers 3 --senders 5 --group-size 211 
--randomize-group-size --capacity 79 --ack-frequency 79"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 10007 --receivers 3 --senders 5 --group-size 211 
--randomize-group-size --capacity 97 --ack-frequency 47"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 40000  --receivers 0 --senders 5 --group-size 13 
--randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY 
--messages 200000 --receivers 3 --senders 0 --capacity 23 --ack-frequency 7"
     "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
 
 while [ -n "${tests[i]}" ]; do



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to