Author: gsim
Date: Wed Jun 25 14:47:51 2014
New Revision: 1605429

URL: http://svn.apache.org/r1605429
Log:
QPID-5828: more consistent behaviour os send() when disconnected

Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h Wed Jun 25 14:47:51 
2014
@@ -155,6 +155,16 @@ struct QPID_MESSAGING_CLASS_EXTERN Targe
     QPID_MESSAGING_EXTERN TargetCapacityExceeded(const std::string&);
 };
 
+/**
+ * Thrown to indicate that the locally configured sender capacity has
+ * been reached, and thus no further messages can be put on the replay
+ * buffer.
+ */
+struct QPID_MESSAGING_CLASS_EXTERN OutOfCapacity : public SendError
+{
+    QPID_MESSAGING_EXTERN OutOfCapacity(const std::string&);
+};
+
 struct QPID_MESSAGING_CLASS_EXTERN SessionError : public MessagingException
 {
     QPID_MESSAGING_EXTERN SessionError(const std::string&);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Wed Jun 25 
14:47:51 2014
@@ -392,5 +392,9 @@ bool ConnectionImpl::getAutoDecode() con
 {
     return !disableAutoDecode;
 }
+bool ConnectionImpl::getAutoReconnect() const
+{
+    return autoReconnect;
+}
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Wed Jun 25 
14:47:51 2014
@@ -57,6 +57,7 @@ class ConnectionImpl : public qpid::mess
     void reconnect();
     std::string getUrl() const;
     bool getAutoDecode() const;
+    bool getAutoReconnect() const;
   private:
     typedef std::map<std::string, qpid::messaging::Session> Sessions;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Wed Jun 25 
14:47:51 2014
@@ -30,8 +30,8 @@ namespace client {
 namespace amqp0_10 {
 
 SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
-                       const qpid::messaging::Address& _address) : 
-    parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
+                       const qpid::messaging::Address& _address, bool 
_autoReconnect) : 
+    parent(&_parent), autoReconnect(_autoReconnect), name(_name), 
address(_address), state(UNRESOLVED),
     capacity(50), window(0), flushed(false), 
unreliable(AddressResolution::is_unreliable(address)) {}
 
 qpid::messaging::Address SenderImpl::getAddress() const
@@ -100,21 +100,37 @@ void SenderImpl::init(qpid::client::Asyn
 void SenderImpl::waitForCapacity() 
 {
     sys::Mutex::ScopedLock l(lock);
-    //TODO: add option to throw exception rather than blocking?
-    if (!unreliable && capacity <=
-        (flushed ? checkPendingSends(false, l) : outgoing.size()))
-    {
-        //Initial implementation is very basic. As outgoing is
-        //currently only reduced on receiving completions and we are
-        //blocking anyway we may as well sync(). If successful that
-        //should clear all outstanding sends.
-        session.sync();
-        checkPendingSends(false, l);
-    }
-    //flush periodically and check for conmpleted sends
-    if (++window > (capacity / 4)) {//TODO: make this configurable?
-        checkPendingSends(true, l);
-        window = 0;
+    try {
+        //TODO: add option to throw exception rather than blocking?
+        if (!unreliable && capacity <=
+            (flushed ? checkPendingSends(false, l) : outgoing.size()))
+        {
+            //Initial implementation is very basic. As outgoing is
+            //currently only reduced on receiving completions and we are
+            //blocking anyway we may as well sync(). If successful that
+            //should clear all outstanding sends.
+            session.sync();
+            checkPendingSends(false, l);
+        }
+        //flush periodically and check for conmpleted sends
+        if (++window > (capacity / 4)) {//TODO: make this configurable?
+            checkPendingSends(true, l);
+            window = 0;
+        }
+    } catch (const qpid::TransportFailure&) {
+        //Disconnection prevents flushing or syncing. If we have any
+        //capacity we will return anyway (the subsequent attempt to
+        //send will fail, but message will be on replay buffer).
+        if (capacity > outgoing.size()) return;
+        //If we are out of capacity, but autoreconnect is on, then
+        //rethrow the transport failure to trigger reconnect which
+        //will have the effect of blocking until connected and
+        //capacity is freed up
+        if (autoReconnect) throw;
+        //Otherwise, in order to clearly signal to the application
+        //that the message was not pushed to replay buffer, throw an
+        //out of capacity error
+        throw qpid::messaging::OutOfCapacity(name);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Wed Jun 25 
14:47:51 2014
@@ -47,7 +47,7 @@ class SenderImpl : public qpid::messagin
     enum State {UNRESOLVED, ACTIVE, CANCELLED};
 
     SenderImpl(SessionImpl& parent, const std::string& name, 
-               const qpid::messaging::Address& address);
+               const qpid::messaging::Address& address, bool autoReconnect);
     void send(const qpid::messaging::Message&, bool sync);
     void close();
     void setCapacity(uint32_t);
@@ -61,6 +61,7 @@ class SenderImpl : public qpid::messagin
   private:
     mutable sys::Mutex lock;
     boost::intrusive_ptr<SessionImpl> parent;
+    const bool autoReconnect;
     const std::string name;
     const qpid::messaging::Address address;
     State state;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Jun 25 
14:47:51 2014
@@ -233,7 +233,7 @@ Sender SessionImpl::createSenderImpl(con
     ScopedLock l(lock);
     std::string name = address.getName();
     getFreeKey(name, senders);
-    Sender sender(new SenderImpl(*this, name, address));
+    Sender sender(new SenderImpl(*this, name, address, 
connection->getAutoReconnect()));
     getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
     senders[name] = sender;
     return sender;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp?rev=1605429&r1=1605428&r2=1605429&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp Wed Jun 25 14:47:51 
2014
@@ -46,6 +46,7 @@ SenderError::SenderError(const std::stri
 SendError::SendError(const std::string& msg) : SenderError(msg) {}
 MessageRejected::MessageRejected(const std::string& msg) : SendError(msg) {}
 TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : 
SendError(msg) {}
+OutOfCapacity::OutOfCapacity(const std::string& msg) : SendError(msg) {}
 
 SessionError::SessionError(const std::string& msg) : MessagingException(msg) {}
 SessionClosed::SessionClosed() : SessionError("Session Closed") {}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to