Author: gsim
Date: Tue Nov 17 17:34:55 2009
New Revision: 881394
URL: http://svn.apache.org/viewvc?rev=881394&view=rev
Log:
QPID-664: Allow application to set session name and retrieve session using that
name; close all sessions when connection is closed.
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h Tue Nov 17 17:34:55
2009
@@ -48,7 +48,8 @@
QPID_CLIENT_EXTERN ~Connection();
QPID_CLIENT_EXTERN Connection& operator=(const Connection&);
QPID_CLIENT_EXTERN void close();
- QPID_CLIENT_EXTERN Session newSession();
+ QPID_CLIENT_EXTERN Session newSession(const std::string& name =
std::string());
+ QPID_CLIENT_EXTERN Session getSession(const std::string& name) const;
private:
friend class qpid::client::PrivateImplRef<Connection>;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Nov 17
17:34:55 2009
@@ -22,14 +22,17 @@
#include "SessionImpl.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/PrivateImplRef.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/intrusive_ptr.hpp>
+#include <vector>
namespace qpid {
namespace client {
namespace amqp0_10 {
using qpid::messaging::Variant;
+using qpid::framing::Uuid;
using namespace qpid::sys;
template <class T> void setIfFound(const Variant::Map& map, const std::string&
key, T& value)
@@ -73,6 +76,15 @@
void ConnectionImpl::close()
{
+ std::vector<std::string> names;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::const_iterator i = sessions.begin(); i !=
sessions.end(); ++i) names.push_back(i->first);
+ }
+ for (std::vector<std::string>::const_iterator i = names.begin(); i !=
names.end(); ++i) {
+ getSession(*i).close();
+ }
+
qpid::sys::Mutex::ScopedLock l(lock);
connection.close();
}
@@ -88,22 +100,34 @@
{
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- if (getImplPtr(*i).get() == &s) {
+ if (getImplPtr(i->second).get() == &s) {
sessions.erase(i);
break;
}
}
}
-qpid::messaging::Session ConnectionImpl::newSession()
+qpid::messaging::Session ConnectionImpl::getSession(const std::string& name)
const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Sessions::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError("No such session: " + name);
+ } else {
+ return i->second;
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::newSession(const std::string& n)
{
+ std::string name = n.empty() ? Uuid(true).str() : n;
qpid::messaging::Session impl(new SessionImpl(*this));
{
qpid::sys::Mutex::ScopedLock l(lock);
- sessions.push_back(impl);
+ sessions[name] = impl;
}
try {
- getImplPtr(impl)->setSession(connection.newSession());
+ getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
reconnect();
}
@@ -172,7 +196,7 @@
try {
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end();
++i) {
- getImplPtr(*i)->setSession(connection.newSession());
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
}
return true;
} catch (const TransportFailure&) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Nov 17
17:34:55 2009
@@ -29,7 +29,7 @@
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
-#include <vector>
+#include <map>
namespace qpid {
namespace client {
@@ -42,13 +42,14 @@
public:
ConnectionImpl(const std::string& url, const
qpid::messaging::Variant::Map& options);
void close();
- qpid::messaging::Session newSession();
+ qpid::messaging::Session newSession(const std::string& name);
+ qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
void reconnect();
private:
- typedef std::vector<qpid::messaging::Session> Sessions;
+ typedef std::map<std::string, qpid::messaging::Session> Sessions;
- qpid::sys::Mutex lock;//used to protect data structures
+ mutable qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Nov 17 17:34:55
2009
@@ -50,7 +50,8 @@
Connection::~Connection() { PI::dtor(*this); }
void Connection::close() { impl->close(); }
-Session Connection::newSession() { return impl->newSession(); }
+Session Connection::newSession(const std::string& name) { return
impl->newSession(name); }
+Session Connection::getSession(const std::string& name) const { return
impl->getSession(name); }
InvalidOptionString::InvalidOptionString(const std::string& msg) :
Exception(msg) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Tue Nov 17 17:34:55
2009
@@ -37,7 +37,8 @@
public:
virtual ~ConnectionImpl() {}
virtual void close() = 0;
- virtual Session newSession() = 0;
+ virtual Session newSession(const std::string& name) = 0;
+ virtual Session getSession(const std::string& name) const = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=881394&r1=881393&r2=881394&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Nov 17 17:34:55
2009
@@ -704,7 +704,6 @@
BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"),
qpid::messaging::KeyError);
}
-
QPID_AUTO_TEST_CASE(testGetReceiver)
{
QueueFixture fix;
@@ -719,6 +718,19 @@
BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"),
qpid::messaging::KeyError);
}
+QPID_AUTO_TEST_CASE(testGetSession)
+{
+ QueueFixture fix;
+ fix.connection.newSession("my-session");
+ Session session = fix.connection.getSession("my-session");
+ Message out(Uuid(true).str());
+ session.createSender(fix.queue).send(out);
+ Message in;
+ BOOST_CHECK(session.createReceiver(fix.queue).fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"),
qpid::messaging::KeyError);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]