Author: aconway
Date: Tue Dec 14 21:30:55 2010
New Revision: 1049286

URL: http://svn.apache.org/viewvc?rev=1049286&view=rev
Log:
Add end-to-end flow control to qpid-send, qpid-receive and qpid-cpp-benchmark.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1049286&r1=1049285&r2=1049286&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Dec 14 21:30:55 2010
@@ -44,7 +44,7 @@ op.add_option("--receive-rate", default=
               help="receive rate limited to N messages/second, 0 means no 
limit (default %default)")
 op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
               help="message size in bytes (default %default)")
-op.add_option("--ack-frequency", default=0, metavar="N", type="int",
+op.add_option("--ack-frequency", default=100, metavar="N", type="int",
               help="receiver ack's every N messages, 0 means unconfirmed 
(default %default)")
 op.add_option("--no-report-header", dest="report_header", default=True,
               action="store_false", help="don't print header on report")
@@ -63,8 +63,10 @@ op.add_option("--no-timestamp", dest="ti
               action="store_false", help="don't add a timestamp, no latency 
results")
 op.add_option("--connection-options", type="str",
               help="Connection options for senders & receivers")
-single_quote_re = re.compile("'")
+op.add_option("--flow-control", default=0, type="int", metavar="N",
+              help="Flow control each sender to limit queue depth to 2*N. 0 
means no flow control.")
 
+single_quote_re = re.compile("'")
 def posix_quote(string):
     """ Quote a string for use as an argument in a posix shell"""
     return "'" + single_quote_re.sub("\\'", string) + "'";
@@ -85,12 +87,13 @@ def start_receive(queue, opts, ready_que
                "--report-total",
                "--ack-frequency", str(opts.ack_frequency),
                "--ready-address", ready_queue,
-               "--report-header=no"]
+               "--report-header=no"
+               ]
     command += opts.receive_arg
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE, stderr=STDOUT)
+    return Popen(command, stdout=PIPE)
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -104,12 +107,14 @@ def start_send(queue, opts, broker, host
                "--report-total",
                "--report-header=no",
                "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
-               "--sequence=no"]
+               "--sequence=no",
+               "--flow-control", str(opts.flow_control)
+               ]
     command += opts.send_arg
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE, stderr=STDOUT)
+    return Popen(command, stdout=PIPE)
 
 def first_line(p):
     out,err=p.communicate()

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1049286&r1=1049285&r2=1049286&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Tue Dec 14 21:30:55 2010
@@ -191,6 +191,9 @@ int main(int argc, char ** argv)
             int64_t interval = 0;
             if (opts.receiveRate) interval = 
qpid::sys::TIME_SEC/opts.receiveRate;
 
+            Address replyToAddress;
+            Sender replyToSender;
+
             while (!done && receiver.fetch(msg, timeout)) {
                 reporter.message(msg);
                 if (!opts.ignoreDuplicates || 
!sequenceTracker.isDuplicate(msg)) {
@@ -223,12 +226,21 @@ int main(int argc, char ** argv)
                 } else if (opts.ackFrequency && (count % opts.ackFrequency == 
0)) {
                     session.acknowledge();
                 }
+                if (msg.getReplyTo()) { // Echo message back to reply-to 
address.
+                    if (msg.getReplyTo() != replyToAddress) {
+                        replyToSender = session.createSender(msg.getReplyTo());
+                        replyToSender.setCapacity(opts.capacity);
+                        replyToAddress = msg.getReplyTo();
+                    }
+                    replyToSender.send(msg);
+                }
                 if (opts.receiveRate) {
                     qpid::sys::AbsTime waitTill(start, count*interval);
                     int64_t delay = qpid::sys::Duration(qpid::sys::now(), 
waitTill);
                     if (delay > 0) 
qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
                 }
-                //opts.rejectFrequency??
+                // Clear out message properties & content for next iteration.
+                msg = Message(); // TODO aconway 2010-12-01: should be done by 
fetch
             }
             if (opts.reportTotal) reporter.report();
             if (opts.tx) {

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1049286&r1=1049285&r2=1049286&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Tue Dec 14 21:30:55 2010
@@ -23,6 +23,7 @@
 #include <qpid/messaging/Connection.h>
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Receiver.h>
 #include <qpid/messaging/Session.h>
 #include <qpid/messaging/FailoverUpdates.h>
 #include <qpid/sys/Time.h>
@@ -71,6 +72,7 @@ struct Options : public qpid::Options
     uint reportEvery;
     bool reportHeader;
     uint sendRate;
+    uint flowControl;
     bool sequence;
     bool timestamp;
 
@@ -94,6 +96,7 @@ struct Options : public qpid::Options
           reportEvery(0),
           reportHeader(true),
           sendRate(0),
+          flowControl(0),
           sequence(true),
           timestamp(true)
     {
@@ -122,6 +125,7 @@ struct Options : public qpid::Options
             ("report-every", qpid::optValue(reportEvery,"N"), "Report 
throughput statistics every N messages")
             ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers 
on report.")
             ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N 
messages/second. 0 means send as fast as possible.")
+            ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end 
flow control to limit queue depth to 2*N. 0 means no flow control.")
             ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence 
number messages property (required for duplicate/lost message detection)")
             ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time 
stamp messages property (required for latency measurement)")
             ("help", qpid::optValue(help), "print this usage statement");
@@ -286,6 +290,14 @@ int main(int argc, char ** argv)
             int64_t interval = 0;
             if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
 
+            Receiver flowControlReceiver;
+            Address flowControlAddress(Uuid(true).str()+";{create:always}");
+            uint flowSent = 0;
+            if (opts.flowControl) {
+                flowControlReceiver = 
session.createReceiver(flowControlAddress);
+                flowControlReceiver.setCapacity(2);
+            }
+
             while (contentGen->setContent(msg)) {
                 ++sent;
                 if (opts.sequence)
@@ -293,6 +305,11 @@ int main(int argc, char ** argv)
                 if (opts.timestamp)
                     msg.getProperties()[TS] = int64_t(
                         qpid::sys::Duration(qpid::sys::EPOCH, 
qpid::sys::now()));
+                if (opts.flowControl && ((sent % opts.flowControl) == 0)) {
+                    msg.setReplyTo(flowControlAddress);
+                    ++flowSent;
+                }
+
                 sender.send(msg);
                 reporter.message(msg);
                 if (opts.tx && (sent % opts.tx == 0)) {
@@ -303,12 +320,18 @@ int main(int argc, char ** argv)
                         session.commit();
                 }
                 if (opts.messages && sent >= opts.messages) break;
+
+                if (opts.flowControl && flowSent == 2) {
+                    flowControlReceiver.get(Duration::SECOND*1);
+                    --flowSent;
+                }
+
                 if (opts.sendRate) {
                     qpid::sys::AbsTime waitTill(start, sent*interval);
                     int64_t delay = qpid::sys::Duration(qpid::sys::now(), 
waitTill);
-                    if (delay > 0)
-                        qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+                    if (delay > 0) 
qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
                 }
+                msg = Message(); // Clear out contents and properties for next 
iteration
             }
             if (opts.reportTotal) reporter.report();
             for (uint i = opts.sendEos; i > 0; --i) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to