Author: gsim
Date: Wed Aug 28 14:56:46 2013
New Revision: 1518233

URL: http://svn.apache.org/r1518233
Log:
QPID-4708: support for reconnect over AMQP 1.0

Modified:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
    qpid/trunk/qpid/cpp/src/tests/interlink_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Aug 
28 14:56:46 2013
@@ -45,18 +45,17 @@ namespace qpid {
 namespace messaging {
 namespace amqp {
 
-ConnectionContext::ConnectionContext(const std::string& u, const 
qpid::types::Variant::Map& o)
+ConnectionContext::ConnectionContext(const std::string& url, const 
qpid::types::Variant::Map& o)
     : qpid::messaging::ConnectionOptions(o),
-      url(u, protocol.empty() ? qpid::Address::TCP : protocol),
       engine(pn_transport()),
       connection(pn_connection()),
       //note: disabled read/write of header as now handled by engine
       writeHeader(false),
       readHeader(false),
       haveOutput(false),
-      state(DISCONNECTED),
-      codecSwitch(*this)
+      state(DISCONNECTED)
 {
+    urls.insert(urls.begin(), url);
     if (pn_transport_bind(engine, connection)) {
         //error
     }
@@ -77,67 +76,6 @@ ConnectionContext::~ConnectionContext()
     pn_connection_free(connection);
 }
 
-namespace {
-const std::string COLON(":");
-}
-void ConnectionContext::open()
-{
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (state != DISCONNECTED) throw 
qpid::messaging::ConnectionError("Connection was already opened!");
-    if (!driver) driver = DriverImpl::getDefault();
-    if (url.getUser().size()) username = url.getUser();
-    if (url.getPass().size()) password = url.getPass();
-
-    for (Url::const_iterator i = url.begin(); state != CONNECTED && i != 
url.end(); ++i) {
-        transport = driver->getTransport(i->protocol, *this);
-        std::stringstream port;
-        port << i->port;
-        id = i->host + COLON + port.str();
-        if (useSasl()) {
-            sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
-        }
-        state = CONNECTING;
-        try {
-            QPID_LOG(debug, id << " Connecting ...");
-            transport->connect(i->host, port.str());
-        } catch (const std::exception& e) {
-            QPID_LOG(info, id << " Error while connecting: " << e.what());
-        }
-        while (state == CONNECTING) {
-            lock.wait();
-        }
-        if (state == DISCONNECTED) {
-            QPID_LOG(debug, id << " Failed to connect");
-            transport = boost::shared_ptr<Transport>();
-        } else {
-            QPID_LOG(debug, id << " Connected");
-        }
-    }
-
-    if (state != CONNECTED) throw 
qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
-
-    if (sasl.get()) {
-        wakeupDriver();
-        while (!sasl->authenticated()) {
-            QPID_LOG(debug, id << " Waiting to be authenticated...");
-            wait();
-        }
-        QPID_LOG(debug, id << " Authenticated");
-    }
-
-    QPID_LOG(debug, id << " Opening...");
-    setProperties();
-    pn_connection_open(connection);
-    wakeupDriver(); //want to write
-    while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
-        wait();
-    }
-    if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
-        throw qpid::messaging::ConnectionError("Failed to open connection");
-    }
-    QPID_LOG(debug, id << " Opened");
-}
-
 bool ConnectionContext::isOpen() const
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -323,45 +261,26 @@ void ConnectionContext::detach(boost::sh
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<SenderContext> lnk)
 {
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
-    attach(ssn->session, (pn_link_t*) lnk->sender);
-    pn_terminus_t* t = pn_link_remote_target(lnk->sender);
-    if (!pn_terminus_get_address(t)) {
-        std::string msg("No such target : ");
-        msg += lnk->getTarget();
-        QPID_LOG(debug, msg);
-        throw qpid::messaging::NotFound(msg);
-    } else if (AddressImpl::isTemporary(lnk->address)) {
-        lnk->address.setName(pn_terminus_get_address(t));
-        QPID_LOG(debug, "Dynamic target name set to " << 
lnk->address.getName());
-    }
-    lnk->verify(t);
+    attach(lnk->sender);
+    lnk->verify();
     checkClosed(ssn, lnk);
     QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
 }
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk)
 {
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
-    attach(ssn->session, lnk->receiver, lnk->capacity);
-    pn_terminus_t* s = pn_link_remote_source(lnk->receiver);
-    if (!pn_terminus_get_address(s)) {
-        std::string msg("No such source : ");
-        msg += lnk->getSource();
-        QPID_LOG(debug, msg);
-        throw qpid::messaging::NotFound(msg);
-    } else if (AddressImpl::isTemporary(lnk->address)) {
-        lnk->address.setName(pn_terminus_get_address(s));
-        QPID_LOG(debug, "Dynamic source name set to " << 
lnk->address.getName());
-    }
-    lnk->verify(s);
+    attach(lnk->receiver, lnk->capacity);
+    lnk->verify();
     checkClosed(ssn, lnk);
     QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
 }
 
-void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int 
credit)
+void ConnectionContext::attach(pn_link_t* link, int credit)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     pn_link_open(link);
     QPID_LOG(debug, "Link attach sent for " << link << ", state=" << 
pn_link_state(link));
     if (credit) pn_link_flow(link, credit);
@@ -457,10 +376,32 @@ pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACT
 pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
 }
 
+void ConnectionContext::reset()
+{
+    pn_transport_free(engine);
+    pn_connection_free(connection);
+
+    engine = pn_transport();
+    connection = pn_connection();
+    pn_connection_set_container(connection, identifier.c_str());
+    bool enableTrace(false);
+    QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+    if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+        i->second->reset(connection);
+    }
+    pn_transport_bind(engine, connection);
+}
+
 void ConnectionContext::check()
 {
     if (state == DISCONNECTED) {
-        throw qpid::messaging::TransportFailure("Disconnected");
+        if (ConnectionOptions::reconnect) {
+            reset();
+            reconnect();
+        } else {
+            throw qpid::messaging::TransportFailure("Disconnected (reconnect 
disabled)");
+        }
     }
     if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
         pn_connection_close(connection);
@@ -510,6 +451,7 @@ void ConnectionContext::waitUntil(boost:
 }
 void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
 {
+    check();
     if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
         pn_session_close(ssn->session);
         throw qpid::messaging::SessionError("Session ended by peer");
@@ -543,6 +485,31 @@ void ConnectionContext::checkClosed(boos
         throw qpid::messaging::LinkError("Link is not attached");
     }
 }
+
+void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
+{
+    pn_session_open(s->session);
+    wakeupDriver();
+    while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
+        wait();
+    }
+
+    for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != 
s->senders.end(); ++i) {
+        QPID_LOG(debug, id << " reattaching sender " << i->first);
+        attach(i->second->sender);
+        i->second->verify();
+        QPID_LOG(debug, id << " sender " << i->first << " reattached");
+        i->second->resend();
+    }
+    for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != 
s->receivers.end(); ++i) {
+        QPID_LOG(debug, id << " reattaching receiver " << i->first);
+        attach(i->second->receiver, i->second->capacity);
+        i->second->verify();
+        QPID_LOG(debug, id << " receiver " << i->first << " reattached");
+    }
+    wakeupDriver();
+}
+
 boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool 
transactional, const std::string& n)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -585,7 +552,7 @@ std::string ConnectionContext::getAuthen
     return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
 }
 
-std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t 
size)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     QPID_LOG(trace, id << " decode(" << size << ")");
@@ -615,7 +582,7 @@ std::size_t ConnectionContext::decode(co
     }
 
 }
-std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     QPID_LOG(trace, id << " encode(" << size << ")");
@@ -642,7 +609,7 @@ std::size_t ConnectionContext::encode(ch
         return 0;
     }
 }
-bool ConnectionContext::canEncode()
+bool ConnectionContext::canEncodePlain()
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     return haveOutput && state == CONNECTED;
@@ -716,47 +683,46 @@ bool ConnectionContext::useSasl()
 
 qpid::sys::Codec& ConnectionContext::getCodec()
 {
-    return codecSwitch;
+    return *this;
 }
 
-ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) 
{}
-std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, 
std::size_t size)
+std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     size_t decoded = 0;
-    if (parent.sasl.get() && !parent.sasl->authenticated()) {
-        decoded = parent.sasl->decode(buffer, size);
-        if (!parent.sasl->authenticated()) return decoded;
+    if (sasl.get() && !sasl->authenticated()) {
+        decoded = sasl->decode(buffer, size);
+        if (!sasl->authenticated()) return decoded;
     }
     if (decoded < size) {
-        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += 
parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
-        else decoded += parent.decode(buffer+decoded, size-decoded);
+        if (sasl.get() && sasl->getSecurityLayer()) decoded += 
sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+        else decoded += decodePlain(buffer+decoded, size-decoded);
     }
     return decoded;
 }
-std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t 
size)
+std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     size_t encoded = 0;
-    if (parent.sasl.get() && parent.sasl->canEncode()) {
-        encoded += parent.sasl->encode(buffer, size);
-        if (!parent.sasl->authenticated()) return encoded;
+    if (sasl.get() && sasl->canEncode()) {
+        encoded += sasl->encode(buffer, size);
+        if (!sasl->authenticated()) return encoded;
     }
     if (encoded < size) {
-        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += 
parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
-        else encoded += parent.encode(buffer+encoded, size-encoded);
+        if (sasl.get() && sasl->getSecurityLayer()) encoded += 
sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+        else encoded += encodePlain(buffer+encoded, size-encoded);
     }
     return encoded;
 }
-bool ConnectionContext::CodecSwitch::canEncode()
+bool ConnectionContext::canEncode()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
-    if (parent.sasl.get()) {
-        if (parent.sasl->canEncode()) return true;
-        else if (!parent.sasl->authenticated()) return false;
-        else if (parent.sasl->getSecurityLayer()) return 
parent.sasl->getSecurityLayer()->canEncode();
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (sasl.get()) {
+        if (sasl->canEncode()) return true;
+        else if (!sasl->authenticated()) return false;
+        else if (sasl->getSecurityLayer()) return 
sasl->getSecurityLayer()->canEncode();
     }
-    return parent.canEncode();
+    return canEncodePlain();
 }
 
 namespace {
@@ -794,4 +760,160 @@ const qpid::sys::SecuritySettings* Conne
     return transport ?  transport->getSecuritySettings() : 0;
 }
 
+void ConnectionContext::open()
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw 
qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+
+    tryConnect();
+}
+
+
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+    std::stringstream os;
+    os << "[";
+    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); 
++i ) {
+        if (i != v.begin()) os << ", ";
+        os << *i;
+    }
+    os << "]";
+    return os.str();
+}
+double FOREVER(std::numeric_limits<double>::max());
+bool expired(const sys::AbsTime& start, double timeout)
+{
+    if (timeout == 0) return true;
+    if (timeout == FOREVER) return false;
+    qpid::sys::Duration used(start, qpid::sys::now());
+    qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC));
+    return allowed < used;
+}
+const std::string COLON(":");
+}
+
+void ConnectionContext::reconnect()
+{
+    qpid::sys::AbsTime started(qpid::sys::now());
+    QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+    for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, 
maxReconnectInterval)) {
+        if (!ConnectionOptions::reconnect) {
+            throw qpid::messaging::TransportFailure("Failed to connect 
(reconnect disabled)");
+        }
+        if (limit >= 0 && retries++ >= limit) {
+            throw qpid::messaging::TransportFailure("Failed to connect within 
reconnect limit");
+        }
+        if (expired(started, timeout)) {
+            throw qpid::messaging::TransportFailure("Failed to connect within 
reconnect timeout");
+        }
+        QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " 
microseconds, urls="
+                 << asString(urls));
+        qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
+    }
+    retries = 0;
+}
+
+bool ConnectionContext::tryConnect()
+{
+    for (std::vector<std::string>::const_iterator i = urls.begin(); i != 
urls.end(); ++i) {
+        try {
+            QPID_LOG(info, "Trying to connect to " << *i << "...");
+            if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP 
: protocol))) {
+                QPID_LOG(info, "Connected to " << *i);
+                if (sasl.get()) {
+                    wakeupDriver();
+                    while (!sasl->authenticated()) {
+                        QPID_LOG(debug, id << " Waiting to be 
authenticated...");
+                        wait();
+                    }
+                    QPID_LOG(debug, id << " Authenticated");
+                }
+
+                QPID_LOG(debug, id << " Opening...");
+                setProperties();
+                pn_connection_open(connection);
+                wakeupDriver(); //want to write
+                while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+                    wait();
+                }
+                if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+                    throw qpid::messaging::ConnectionError("Failed to open 
connection");
+                }
+                QPID_LOG(debug, id << " Opened");
+
+                return restartSessions();
+            }
+        } catch (const qpid::messaging::TransportFailure& e) {
+            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+        }
+    }
+    return false;
+}
+
+bool ConnectionContext::tryConnect(const std::string& url)
+{
+    return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : 
protocol));
+}
+
+bool ConnectionContext::tryConnect(const Url& url)
+{
+    if (url.getUser().size()) username = url.getUser();
+    if (url.getPass().size()) password = url.getPass();
+
+    for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
+        if (tryConnect(*i)) return true;
+    }
+    return false;
+}
+
+bool ConnectionContext::tryConnect(const qpid::Address& address)
+{
+    transport = driver->getTransport(address.protocol, *this);
+    std::stringstream port;
+    port << address.port;
+    id = address.host + COLON + port.str();
+    if (useSasl()) {
+        sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
+    }
+    state = CONNECTING;
+    try {
+        QPID_LOG(debug, id << " Connecting ...");
+        transport->connect(address.host, port.str());
+        bool waiting(true);
+        while (waiting) {
+            switch (state) {
+              case CONNECTED:
+                QPID_LOG(debug, id << " Connected");
+                return true;
+              case CONNECTING:
+                lock.wait();
+                break;
+              case DISCONNECTED:
+                waiting = false;
+                QPID_LOG(debug, id << " Failed to connect");
+                break;
+            }
+        }
+    } catch (const std::exception& e) {
+        QPID_LOG(info, id << " Error while connecting: " << e.what());
+    }
+    transport = boost::shared_ptr<Transport>();
+    return false;
+}
+
+bool ConnectionContext::restartSessions()
+{
+    try {
+        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); 
++i) {
+            restartSession(i->second);
+        }
+        return true;
+    } catch (const qpid::TransportFailure& e) {
+        QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << 
e.what());
+        return false;
+    }
+}
+
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Wed Aug 28 
14:56:46 2013
@@ -110,7 +110,6 @@ class ConnectionContext : public qpid::s
 
   private:
     typedef std::map<std::string, boost::shared_ptr<SessionContext> > 
SessionMap;
-    qpid::Url url;
 
     boost::shared_ptr<DriverImpl> driver;
     boost::shared_ptr<Transport> transport;
@@ -129,17 +128,6 @@ class ConnectionContext : public qpid::s
         CONNECTED
     } state;
     std::auto_ptr<Sasl> sasl;
-    class CodecSwitch : public qpid::sys::Codec
-    {
-      public:
-        CodecSwitch(ConnectionContext&);
-        std::size_t decode(const char* buffer, std::size_t size);
-        std::size_t encode(char* buffer, std::size_t size);
-        bool canEncode();
-      private:
-        ConnectionContext& parent;
-    };
-    CodecSwitch codecSwitch;
 
     void check();
     void wait();
@@ -155,7 +143,19 @@ class ConnectionContext : public qpid::s
     void checkClosed(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext>);
     void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
     void wakeupDriver();
-    void attach(pn_session_t*, pn_link_t*, int credit=0);
+    void attach(pn_link_t*, int credit=0);
+    void reconnect();
+    bool tryConnect();
+    bool tryConnect(const std::string& url);
+    bool tryConnect(const qpid::Url& url);
+    bool tryConnect(const qpid::Address& address);
+    void reset();
+    bool restartSessions();
+    void restartSession(boost::shared_ptr<SessionContext>);
+
+    std::size_t decodePlain(const char* buffer, std::size_t size);
+    std::size_t encodePlain(char* buffer, std::size_t size);
+    bool canEncodePlain();
 
     std::size_t readProtocolHeader(const char* buffer, std::size_t size);
     std::size_t writeProtocolHeader(char* buffer, std::size_t size);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Aug 28 
14:56:46 2013
@@ -89,8 +89,18 @@ const std::string& ReceiverContext::getS
 {
     return address.getName();
 }
-void ReceiverContext::verify(pn_terminus_t* source)
+void ReceiverContext::verify()
 {
+    pn_terminus_t* source = pn_link_remote_source(receiver);
+    if (!pn_terminus_get_address(source)) {
+        std::string msg("No such source : ");
+        msg += getSource();
+        QPID_LOG(debug, msg);
+        throw qpid::messaging::NotFound(msg);
+    } else if (AddressImpl::isTemporary(address)) {
+        address.setName(pn_terminus_get_address(source));
+        QPID_LOG(debug, "Dynamic source name set to " << address.getName());
+    }
     helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
 }
 void ReceiverContext::configure()
@@ -118,6 +128,10 @@ bool ReceiverContext::isClosed() const
     return false;//TODO
 }
 
-
+void ReceiverContext::reset(pn_session_t* session)
+{
+    receiver = pn_receiver(session, name.c_str());
+    configure();
+}
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Wed Aug 28 
14:56:46 2013
@@ -46,6 +46,7 @@ class ReceiverContext
   public:
     ReceiverContext(pn_session_t* session, const std::string& name, const 
qpid::messaging::Address& source);
     ~ReceiverContext();
+    void reset(pn_session_t* session);
     void setCapacity(uint32_t);
     uint32_t getCapacity();
     uint32_t getAvailable();
@@ -56,7 +57,7 @@ class ReceiverContext
     const std::string& getSource() const;
     bool isClosed() const;
     void configure();
-    void verify(pn_terminus_t*);
+    void verify();
     Address getAddress() const;
   private:
     friend class ConnectionContext;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Aug 28 
14:56:46 2013
@@ -82,6 +82,7 @@ const std::string& SenderContext::getTar
 
 bool SenderContext::send(const qpid::messaging::Message& message, 
SenderContext::Delivery** out)
 {
+    resend();//if there are any messages needing to be resent at the front of 
the queue, send them first
     if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
         if (unreliable) {
             Delivery delivery(nextId++);
@@ -424,7 +425,12 @@ bool changedSubject(const qpid::messagin
 
 }
 
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
+SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), 
presettled(false) {}
+
+void SenderContext::Delivery::reset()
+{
+    token = 0;
+}
 
 void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, 
const qpid::messaging::Address& address)
 {
@@ -490,13 +496,20 @@ void SenderContext::Delivery::send(pn_li
     tag.bytes = reinterpret_cast<const char*>(&id);
     token = pn_delivery(sender, tag);
     pn_link_send(sender, encoded.getData(), encoded.getSize());
-    if (unreliable) pn_delivery_settle(token);
+    if (unreliable) {
+        pn_delivery_settle(token);
+        presettled = true;
+    }
     pn_link_advance(sender);
 }
 
+bool SenderContext::Delivery::sent() const
+{
+    return presettled || token;
+}
 bool SenderContext::Delivery::delivered()
 {
-    if (pn_delivery_remote_state(token) || pn_delivery_settled(token)) {
+    if (presettled || (token && (pn_delivery_remote_state(token) || 
pn_delivery_settled(token)))) {
         //TODO: need a better means for signalling outcomes other than accepted
         if (rejected()) {
             QPID_LOG(warning, "delivery " << id << " was rejected by peer");
@@ -520,8 +533,19 @@ void SenderContext::Delivery::settle()
 {
     pn_delivery_settle(token);
 }
-void SenderContext::verify(pn_terminus_t* target)
+void SenderContext::verify()
 {
+    pn_terminus_t* target = pn_link_remote_target(sender);
+    if (!pn_terminus_get_address(target)) {
+        std::string msg("No such target : ");
+        msg += getTarget();
+        QPID_LOG(debug, msg);
+        throw qpid::messaging::NotFound(msg);
+    } else if (AddressImpl::isTemporary(address)) {
+        address.setName(pn_terminus_get_address(target));
+        QPID_LOG(debug, "Dynamic target name set to " << address.getName());
+    }
+
     helper.checkAssertion(target, AddressHelper::FOR_SENDER);
 }
 void SenderContext::configure()
@@ -549,4 +573,22 @@ Address SenderContext::getAddress() cons
     return address;
 }
 
+
+void SenderContext::reset(pn_session_t* session)
+{
+    sender = pn_sender(session, name.c_str());
+    configure();
+
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); 
++i) {
+        i->reset();
+    }
+}
+
+void SenderContext::resend()
+{
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && 
pn_link_credit(sender) && !i->sent(); ++i) {
+        i->send(sender, false/*only resend reliable transfers*/);
+    }
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Wed Aug 28 
14:56:46 2013
@@ -57,14 +57,18 @@ class SenderContext
         bool accepted();
         bool rejected();
         void settle();
+        void reset();
+        bool sent() const;
       private:
         int32_t id;
         pn_delivery_t* token;
         EncodedMessage encoded;
+        bool presettled;
     };
 
     SenderContext(pn_session_t* session, const std::string& name, const 
qpid::messaging::Address& target);
     ~SenderContext();
+    void reset(pn_session_t* session);
     void close();
     void setCapacity(uint32_t);
     uint32_t getCapacity();
@@ -73,7 +77,7 @@ class SenderContext
     const std::string& getTarget() const;
     bool send(const qpid::messaging::Message& message, Delivery**);
     void configure();
-    void verify(pn_terminus_t*);
+    void verify();
     void check();
     bool settled();
     Address getAddress() const;
@@ -92,6 +96,7 @@ class SenderContext
 
     uint32_t processUnsettled(bool silent);
     void configure(pn_terminus_t*);
+    void resend();
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Wed Aug 28 
14:56:46 2013
@@ -163,4 +163,15 @@ std::string SessionContext::getName() co
     return name;
 }
 
+void SessionContext::reset(pn_connection_t* connection)
+{
+    session = pn_session(connection);
+    unacked.clear();
+    for (SessionContext::SenderMap::iterator i = senders.begin(); i != 
senders.end(); ++i) {
+        i->second->reset(session);
+    }
+    for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != 
receivers.end(); ++i) {
+        i->second->reset(session);
+    }
+}
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Wed Aug 28 
14:56:46 2013
@@ -50,6 +50,7 @@ class SessionContext
   public:
     SessionContext(pn_connection_t*);
     ~SessionContext();
+    void reset(pn_connection_t*);
     boost::shared_ptr<SenderContext> createSender(const 
qpid::messaging::Address& address);
     boost::shared_ptr<ReceiverContext> createReceiver(const 
qpid::messaging::Address& address);
     boost::shared_ptr<SenderContext> getSender(const std::string& name) const;

Modified: qpid/trunk/qpid/cpp/src/tests/interlink_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/interlink_tests.py?rev=1518233&r1=1518232&r2=1518233&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/interlink_tests.py Wed Aug 28 14:56:46 2013
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, s
 import traceback
 from qpid.messaging import Message, SessionError, NotFound, ConnectionError, 
ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from brokertest import *
+from ha_test import HaPort
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
 from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
     def setUp(self):
         BrokerTest.setUp(self)
         os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
-        self.broker = self.amqp_broker()
+        self.port_holder = HaPort(self)
+        self.broker = self.amqp_broker(port_holder=self.port_holder)
         self.default_config = Config(self.broker)
         self.agent = BrokerAgent(self.broker.connect())
 
@@ -252,14 +254,73 @@ class AmqpBrokerTest(BrokerTest):
         #send to q on broker B through brokerA
         self.send_and_receive(send_config=Config(self.broker, 
address="q@BrokerB"), recv_config=Config(brokerB))
 
+    def test_reconnect(self):
+        receiver_cmd = ["qpid-receive",
+               "--broker", self.broker.host_port(),
+               "--address=amq.fanout",
+               "--connection-options={protocol:amqp1.0, 
reconnect:True,container_id:receiver}",
+               "--timeout=10", "--print-content=true", "--print-headers=false"
+               ]
+        receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+        sender_cmd = ["qpid-send",
+               "--broker", self.broker.host_port(),
+               "--address=amq.fanout",
+               
"--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+               "--content-stdin", "--send-eos=1"
+               ]
+        sender = self.popen(sender_cmd, stdin=PIPE)
+        sender._set_cloexec_flag(sender.stdin) #required for older python, see 
http://bugs.python.org/issue4112
+
+
+        batch1 = ["message-%s" % (i+1) for i in range(10000)]
+        for m in batch1:
+            sender.stdin.write(m + "\n")
+            sender.stdin.flush()
+
+        self.broker.kill()
+        self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+        batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+        for m in batch2:
+            sender.stdin.write(m + "\n")
+            sender.stdin.flush()
+
+        sender.stdin.close()
+
+        last = None
+        m = receiver.stdout.readline().rstrip()
+        while len(m):
+            last = m
+            m = receiver.stdout.readline().rstrip()
+        assert last == "message-20000", (last)
+
     """ Create and return a broker with AMQP 1.0 support """
     def amqp_broker(self):
         assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+        self.port_holder = HaPort(self) #reserve port
         args = ["--load-module", BrokerTest.amqp_lib,
-                "--max-negotiate-time=600000",
+                "--socket-fd=%s" % self.port_holder.fileno,
+                "--listen-disable=tcp",
                 "--log-enable=trace+:Protocol",
                 "--log-enable=info+"]
-        return BrokerTest.broker(self, args)
+        return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+    def amqp_broker(self, port_holder=None):
+        assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+        if port_holder:
+            args = ["--load-module", BrokerTest.amqp_lib,
+                    "--socket-fd=%s" % port_holder.fileno,
+                    "--listen-disable=tcp",
+                    "--log-enable=trace+:Protocol",
+                    "--log-enable=info+"]
+            return BrokerTest.broker(self, args, port=port_holder.port)
+        else:
+            args = ["--load-module", BrokerTest.amqp_lib,
+                    "--log-enable=trace+:Protocol",
+                    "--log-enable=info+"]
+            return BrokerTest.broker(self, args)
+
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to