Author: aconway
Date: Wed Dec 15 18:10:12 2010
New Revision: 1049656

URL: http://svn.apache.org/viewvc?rev=1049656&view=rev
Log:
Fix flow control for qpid-cpp-benchmark with multiple senders.

Ensure senders & receivers agree on number of messages sent/received.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Wed Dec 15 18:10:12 2010
@@ -75,12 +75,16 @@ def ssh_command(host, command):
     """Convert command into an ssh command on host with quoting"""
     return ["ssh", host] + [posix_quote(arg) for arg in command]
 
-def start_receive(queue, opts, ready_queue, broker, host):
+def start_receive(queue, index, opts, ready_queue, broker, host):
     address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
+    msg_total=opts.senders*opts.messages
+    messages = msg_total/opts.receivers;
+    if (index < msg_total%opts.receivers): messages += 1
+    if (messages == 0): return None
     command = ["qpid-receive",
                "-b", broker,
                "-a", address,
-               "-m", str((opts.senders*opts.messages)/opts.receivers),
+               "-m", str(messages),
                "--forever",
                "--print-content=no",
                "--receive-rate", str(opts.receive_rate),
@@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host
                "-b", broker,
                "-a", address,
                "--messages", str(opts.messages),
-               "--send-eos", str(opts.receivers),
                "--content-size", str(opts.content_size),
                "--send-rate", str(opts.send_rate),
                "--report-total",
@@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host
 
 def first_line(p):
     out,err=p.communicate()
-    if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
+    if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
     return out.split("\n")[0]
 
 def delete_queues(queues, broker):
@@ -144,7 +147,7 @@ def parse_senders(senders):
     return parse([int],[first_line(p) for p in senders])
     
 def parse_receivers(receivers):
-    return parse([int,float,float,float],[first_line(p) for p in receivers])
+    return parse([int,float,float,float],[first_line(p) for p in receivers if 
p])
 
 def print_data(send_stats, recv_stats):
     for send,recv in map(None, send_stats, recv_stats):
@@ -216,9 +219,9 @@ def main():
     for i in xrange(opts.repeat):
         delete_queues(queues, opts.broker[0])
         ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-        receivers = [start_receive(q, opts, ready_queue, brokers.next(), 
client_hosts.next())
+        receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), 
client_hosts.next())
                      for q in queues for j in xrange(opts.receivers)]
-        ready_receiver.wait(receivers) # Wait for receivers to be ready.
+        ready_receiver.wait(filter(None, receivers)) # Wait for receivers to 
be ready.
         senders = [start_send(q, opts,brokers.next(), client_hosts.next())
                    for q in queues for j in xrange(opts.senders)]
         if opts.report_header and i == 0: print_header(opts.timestamp)

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Wed Dec 15 18:10:12 2010
@@ -191,8 +191,7 @@ int main(int argc, char ** argv)
             int64_t interval = 0;
             if (opts.receiveRate) interval = 
qpid::sys::TIME_SEC/opts.receiveRate;
 
-            Address replyToAddress;
-            Sender replyToSender;
+            std::map<std::string,Sender> replyTo;
 
             while (!done && receiver.fetch(msg, timeout)) {
                 reporter.message(msg);
@@ -227,12 +226,12 @@ int main(int argc, char ** argv)
                     session.acknowledge();
                 }
                 if (msg.getReplyTo()) { // Echo message back to reply-to 
address.
-                    if (msg.getReplyTo() != replyToAddress) {
-                        replyToSender = session.createSender(msg.getReplyTo());
-                        replyToSender.setCapacity(opts.capacity);
-                        replyToAddress = msg.getReplyTo();
+                    Sender& s = replyTo[msg.getReplyTo().str()];
+                    if (s.isNull()) {
+                        s = session.createSender(msg.getReplyTo());
+                        s.setCapacity(opts.capacity);
                     }
-                    replyToSender.send(msg);
+                    s.send(msg);
                 }
                 if (opts.receiveRate) {
                     qpid::sys::AbsTime waitTill(start, count*interval);

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Wed Dec 15 18:10:12 2010
@@ -291,7 +291,7 @@ int main(int argc, char ** argv)
             if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
 
             Receiver flowControlReceiver;
-            Address flowControlAddress(Uuid(true).str()+";{create:always}");
+            Address 
flowControlAddress("flow-"+Uuid(true).str()+";{create:always,delete:always}");
             uint flowSent = 0;
             if (opts.flowControl) {
                 flowControlReceiver = 
session.createReceiver(flowControlAddress);
@@ -322,7 +322,7 @@ int main(int argc, char ** argv)
                 if (opts.messages && sent >= opts.messages) break;
 
                 if (opts.flowControl && flowSent == 2) {
-                    flowControlReceiver.get(Duration::SECOND*1);
+                    flowControlReceiver.get(Duration::SECOND);
                     --flowSent;
                 }
 
@@ -333,11 +333,13 @@ int main(int argc, char ** argv)
                 }
                 msg = Message(); // Clear out contents and properties for next 
iteration
             }
+            for ( ; flowSent>0; --flowSent)
+                flowControlReceiver.get(Duration::SECOND);
             if (opts.reportTotal) reporter.report();
             for (uint i = opts.sendEos; i > 0; --i) {
                 if (opts.sequence)
                     msg.getProperties()[SN] = ++sent;
-                msg.setContent(EOS);//TODO: add in ability to send digest or 
similar
+                msg.setContent(EOS); //TODO: add in ability to send digest or 
similar
                 sender.send(msg);
             }
             if (opts.tx) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to