Author: gsim
Date: Thu Apr 30 20:39:39 2015
New Revision: 1677064
URL: http://svn.apache.org/r1677064
Log:
QPID-6526: make sure that the creation of proton links is done with the lock
held
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1677064&r1=1677063&r2=1677064&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Apr
30 20:39:39 2015
@@ -417,7 +417,6 @@ void ConnectionContext::detach(boost::sh
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn,
boost::shared_ptr<SenderContext> lnk)
{
- sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->sender);
checkClosed(ssn, lnk);
@@ -427,7 +426,6 @@ void ConnectionContext::attach(boost::sh
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn,
boost::shared_ptr<ReceiverContext> lnk)
{
- sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->receiver, lnk->capacity);
checkClosed(ssn, lnk);
@@ -447,6 +445,43 @@ void ConnectionContext::attach(boost::sh
}
}
+boost::shared_ptr<SenderContext>
ConnectionContext::createSender(boost::shared_ptr<SessionContext> session,
const qpid::messaging::Address& address)
+{
+ sys::Monitor::ScopedLock l(lock);
+ boost::shared_ptr<SenderContext> sender = session->createSender(address,
setToOnSend);
+ try {
+ attach(session, sender);
+ return sender;
+ } catch (...) {
+ session->removeSender(sender->getName());
+ throw;
+ }
+
+}
+boost::shared_ptr<ReceiverContext>
ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session,
const qpid::messaging::Address& address)
+{
+ sys::Monitor::ScopedLock l(lock);
+ boost::shared_ptr<ReceiverContext> receiver =
session->createReceiver(address);
+ try {
+ attach(session, receiver);
+ return receiver;
+ } catch (...) {
+ session->removeReceiver(receiver->getName());
+ throw;
+ }
+}
+boost::shared_ptr<SenderContext>
ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const
std::string& name) const
+{
+ sys::Monitor::ScopedLock l(lock);
+ return session->getSender(name);
+}
+
+boost::shared_ptr<ReceiverContext>
ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const
std::string& name) const
+{
+ sys::Monitor::ScopedLock l(lock);
+ return session->getReceiver(name);
+}
+
void ConnectionContext::send(
boost::shared_ptr<SessionContext> ssn,
boost::shared_ptr<SenderContext> snd,
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1677064&r1=1677063&r2=1677064&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Thu Apr 30
20:39:39 2015
@@ -76,8 +76,11 @@ class ConnectionContext : public qpid::s
boost::shared_ptr<SessionContext> newSession(bool transactional, const
std::string& name);
boost::shared_ptr<SessionContext> getSession(const std::string& name)
const;
void endSession(boost::shared_ptr<SessionContext>);
- void attach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<SenderContext>);
- void attach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<ReceiverContext>);
+ boost::shared_ptr<SenderContext>
createSender(boost::shared_ptr<SessionContext>, const qpid::messaging::Address&
address);
+ boost::shared_ptr<ReceiverContext>
createReceiver(boost::shared_ptr<SessionContext>, const
qpid::messaging::Address& address);
+ boost::shared_ptr<SenderContext>
getSender(boost::shared_ptr<SessionContext>, const std::string& name) const;
+ boost::shared_ptr<ReceiverContext>
getReceiver(boost::shared_ptr<SessionContext>, const std::string& name) const;
+
void detach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<SenderContext>);
void detach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<ReceiverContext>);
void drain_and_release_messages(boost::shared_ptr<SessionContext>,
boost::shared_ptr<ReceiverContext>);
@@ -191,6 +194,8 @@ class ConnectionContext : public qpid::s
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
+ void attach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<SenderContext>);
+ void attach(boost::shared_ptr<SessionContext>,
boost::shared_ptr<ReceiverContext>);
void autoconnect();
bool tryConnectUrl(const qpid::Url& url);
bool tryOpenAddr(const qpid::Address& address);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1677064&r1=1677063&r2=1677064&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Thu Apr 30
20:39:39 2015
@@ -84,26 +84,14 @@ void SessionHandle::sync(bool block)
qpid::messaging::Sender SessionHandle::createSender(const
qpid::messaging::Address& address)
{
- boost::shared_ptr<SenderContext> sender = session->createSender(address,
connection->setToOnSend);
- try {
- connection->attach(session, sender);
- return qpid::messaging::Sender(new SenderHandle(connection, session,
sender));
- } catch (...) {
- session->removeSender(sender->getName());
- throw;
- }
+ boost::shared_ptr<SenderContext> sender =
connection->createSender(session, address);
+ return qpid::messaging::Sender(new SenderHandle(connection, session,
sender));
}
qpid::messaging::Receiver SessionHandle::createReceiver(const
qpid::messaging::Address& address)
{
- boost::shared_ptr<ReceiverContext> receiver =
session->createReceiver(address);
- try {
- connection->attach(session, receiver);
- return qpid::messaging::Receiver(new ReceiverHandle(connection,
session, receiver));
- } catch (...) {
- session->removeReceiver(receiver->getName());
- throw;
- }
+ boost::shared_ptr<ReceiverContext> receiver =
connection->createReceiver(session, address);
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session,
receiver));
}
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
@@ -137,12 +125,12 @@ uint32_t SessionHandle::getUnsettledAcks
Sender SessionHandle::getSender(const std::string& name) const
{
- return qpid::messaging::Sender(new SenderHandle(connection, session,
session->getSender(name)));
+ return qpid::messaging::Sender(new SenderHandle(connection, session,
connection->getSender(session, name)));
}
Receiver SessionHandle::getReceiver(const std::string& name) const
{
- return qpid::messaging::Receiver(new ReceiverHandle(connection, session,
session->getReceiver(name)));
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session,
connection->getReceiver(session, name)));
}
Connection SessionHandle::getConnection() const
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]