Performance problems
--------------------

                 Key: QPID-2652
                 URL: https://issues.apache.org/jira/browse/QPID-2652
             Project: Qpid
          Issue Type: Bug
          Components: Python Client
         Environment: Debian GNU Linux squeeze, QPID 0.6/trunk
            Reporter: Cajus Pollmeier


When using the python client, I noticed that there's a significant performance 
difference when using python and c++ services. Using the client below runs ~10s 
with the c++ request/reponse and  ~40s with the python one. A factor 4 slowdown 
doesn't seem to recommend the python libs.

C++ request/response:

#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Variant.h>

#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <sstream>
#include <pthread.h>

using namespace qpid::messaging;

using std::stringstream;
using std::string;

Connection connection;

void * run(void *conn_ptr) {
    Session session = connection.newSession();
    Receiver receiver = session.createReceiver("requests; {create: always}");
    receiver.setCapacity(2);
    while (true) {
        Message request = receiver.fetch();
        try {
            const Address& address = request.getReplyTo();
            if (address) {
                std::string s = request.getContent();

                std::cout << "Processing request: " 
                          << request.getContent() 
                          << " -> " 
                          << request.getReplyTo().getName() << std::endl;

                Sender sender = 
session.createSender(request.getReplyTo().getName()  );
                std::transform(s.begin(), s.end(), s.begin(), toupper);
                Message response(s);
                sender.send(response);

                std::cout << "Processed request: " 
                          << request.getContent() 
                          << " -> " 
                          << response.getContent() << std::endl;

                session.acknowledge();
            } else {
                std::cerr << "Error: no reply address specified for request: " 
<< request.getContent() << std::endl;
                session.reject(request);
            }
        } catch(const std::exception& error) {
            std::cout << "Something went wrong: " << error.what()  << std::endl;
            session.reject(request);
        }
    }
}

int main(int argc, char** argv) {
    const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
    Variant::Map amqp_options;
    amqp_options["username"] = "admin";
    amqp_options["password"] = "secret";

    connection = Connection::open(url, amqp_options);

    int i = 0;
    pthread_t threads[10];
    // Create the threads
    for(i = 0; i<10; i++) {
        pthread_create(&threads[i], NULL, run, (void*) NULL);
    }

    // Get the threads back
    for(i = 0; i<10; i++) {
        pthread_join(threads[i], NULL);
    }
    connection.close();
    return 0;
}

----
Python request/response:

import traceback
from qpid.messaging import *
from threading import Thread

class Processor(Thread):

  def __init__(self, ssn):
    Thread.__init__(self)
    self.setDaemon(True)
    self.ssn = ssn

  def run(self):
    while True:
      msg = self.ssn.next_receiver().fetch()
      try:
        to, reply = self.process(msg)
        snd = ssn.sender(to)
        snd.send(reply)
        snd.close()
        self.ssn.acknowledge(msg)
      except:
        traceback.print_exc()
        print "rejecting: %s" % msg
        self.ssn.acknowledge(msg, Disposition(REJECTED))

  def process(self, msg):
    print "processing %s" % msg
    return msg.reply_to, Message("echo %s" % msg.content)

conn = Connection.establish("localhost", username='admin', password='secret')

processors = []

for i in range(10):
  ssn = conn.session()
  rcv = ssn.receiver("requests", capacity=10)
  proc = Processor(ssn)
  proc.start()
  processors.append(proc)

# this may look a bit weird, but the timed join keeps python from
# swallowing interrupts
while True:
  for p in processors:
    p.join(3)

---
Test-Client:

import traceback
from qpid.messaging import *
from threading import Thread

conn = Connection.establish("localhost")
ssn = conn.session(str(uuid4()))
snd = ssn.sender("requests")
rcv = ssn.receiver('reply-%s; {create:always, delete:always}' % ssn.name)

for i in range(1000):
    msg = Message("The quick brown fox tests the performance")    
    msg.reply_to = 'reply-%s' % ssn.name
    snd.send(msg)
    res = rcv.fetch()
    print res.content



-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to