Author: aconway
Date: Wed Dec 15 18:10:12 2010
New Revision: 1049656
URL: http://svn.apache.org/viewvc?rev=1049656&view=rev
Log:
Fix flow control for qpid-cpp-benchmark with multiple senders.
Ensure senders & receivers agree on number of messages sent/received.
Modified:
qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Wed Dec 15 18:10:12 2010
@@ -75,12 +75,16 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
-def start_receive(queue, opts, ready_queue, broker, host):
+def start_receive(queue, index, opts, ready_queue, broker, host):
address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
command = ["qpid-receive",
"-b", broker,
"-a", address,
- "-m", str((opts.senders*opts.messages)/opts.receivers),
+ "-m", str(messages),
"--forever",
"--print-content=no",
"--receive-rate", str(opts.receive_rate),
@@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host
"-b", broker,
"-a", address,
"--messages", str(opts.messages),
- "--send-eos", str(opts.receivers),
"--content-size", str(opts.content_size),
"--send-rate", str(opts.send_rate),
"--report-total",
@@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
+ if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
return out.split("\n")[0]
def delete_queues(queues, broker):
@@ -144,7 +147,7 @@ def parse_senders(senders):
return parse([int],[first_line(p) for p in senders])
def parse_receivers(receivers):
- return parse([int,float,float,float],[first_line(p) for p in receivers])
+ return parse([int,float,float,float],[first_line(p) for p in receivers if
p])
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
@@ -216,9 +219,9 @@ def main():
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, opts, ready_queue, brokers.next(),
client_hosts.next())
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(),
client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(receivers) # Wait for receivers to be ready.
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to
be ready.
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Wed Dec 15 18:10:12 2010
@@ -191,8 +191,7 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.receiveRate) interval =
qpid::sys::TIME_SEC/opts.receiveRate;
- Address replyToAddress;
- Sender replyToSender;
+ std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
@@ -227,12 +226,12 @@ int main(int argc, char ** argv)
session.acknowledge();
}
if (msg.getReplyTo()) { // Echo message back to reply-to
address.
- if (msg.getReplyTo() != replyToAddress) {
- replyToSender = session.createSender(msg.getReplyTo());
- replyToSender.setCapacity(opts.capacity);
- replyToAddress = msg.getReplyTo();
+ Sender& s = replyTo[msg.getReplyTo().str()];
+ if (s.isNull()) {
+ s = session.createSender(msg.getReplyTo());
+ s.setCapacity(opts.capacity);
}
- replyToSender.send(msg);
+ s.send(msg);
}
if (opts.receiveRate) {
qpid::sys::AbsTime waitTill(start, count*interval);
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1049656&r1=1049655&r2=1049656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Wed Dec 15 18:10:12 2010
@@ -291,7 +291,7 @@ int main(int argc, char ** argv)
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
Receiver flowControlReceiver;
- Address flowControlAddress(Uuid(true).str()+";{create:always}");
+ Address
flowControlAddress("flow-"+Uuid(true).str()+";{create:always,delete:always}");
uint flowSent = 0;
if (opts.flowControl) {
flowControlReceiver =
session.createReceiver(flowControlAddress);
@@ -322,7 +322,7 @@ int main(int argc, char ** argv)
if (opts.messages && sent >= opts.messages) break;
if (opts.flowControl && flowSent == 2) {
- flowControlReceiver.get(Duration::SECOND*1);
+ flowControlReceiver.get(Duration::SECOND);
--flowSent;
}
@@ -333,11 +333,13 @@ int main(int argc, char ** argv)
}
msg = Message(); // Clear out contents and properties for next
iteration
}
+ for ( ; flowSent>0; --flowSent)
+ flowControlReceiver.get(Duration::SECOND);
if (opts.reportTotal) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {
if (opts.sequence)
msg.getProperties()[SN] = ++sent;
- msg.setContent(EOS);//TODO: add in ability to send digest or
similar
+ msg.setContent(EOS); //TODO: add in ability to send digest or
similar
sender.send(msg);
}
if (opts.tx) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]