I wrote a program who sends 1000 messages per second on testing address
continuously
The program below, read these messages, and after 10000 messages, create a
new session and receptor with same configuration as before
The receptor and session get out of scope before creating the new one, but
if I don't call session.close(); the memory on broker increase quickly
If I call session.close(); before creating the new session instance, it
doesn't happen
The topic was created with...
qpid-config add exchange topic testing
Looks like if reception.close(); or reception out of scope is not enough
to remove the "suscription" on broker side and the broker save all the
messages for a deleted reception and session objects
If so, it will be necessary to use just a reception per session in order to
let call session.close(); when a reception is not required by the program
anymore
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <iostream>
using namespace qpid::messaging;
int main(int /*argc*/, char** /*argv*/) {
std::string broker = "localhost:5672";
std::string address = "testing";
Connection connection(broker, "");
try {
connection.open();
{
Session session = connection.createSession();
std::string receiver_config = address + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False} }";
Receiver receiver = session.createReceiver(receiver_config);
receiver.setCapacity(10);
for(int counter=0; counter<10000; ++counter)
Message message = receiver.fetch(Duration::SECOND * 1);
//session.close(); // this line commented, generates a
fast increase of memory on broker
}
std::cout << "other subscription receiver gets out of scope" <<
std::endl;
{
Session session = connection.createSession();
std::string receiver_config = address + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False} }";
Receiver receiver = session.createReceiver(receiver_config);
for(int counter=0; counter<60000; ++counter)
Message message = receiver.fetch(Duration::SECOND * 1);
}
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
connection.close();
return 1;
}
}
--
View this message in context:
http://old.nabble.com/memory-consumption-on-broker-not-closing-session-tp32503958p32503958.html
Sent from the Qpid Developers mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]