Author: gsim
Date: Wed Mar  4 13:22:03 2009
New Revision: 750002

URL: http://svn.apache.org/viewvc?rev=750002&view=rev
Log:
QPID-1711: Ensure the session state between the two peers in an inter-broker 
bridging session are kept in sync.

(Also made changes to cancellation to ensure that the commands are only issued 
on the io thread of the connection)


Modified:
    qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h

Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Mar  4 13:22:03 2009
@@ -183,6 +183,7 @@
 }
 
 bool SessionState::receiverRecord(const AMQFrame& f) {
+    if (receiverTrackingDisabled) return true; //Very nasty hack for push 
bridges
     if (isControl(f)) return true; // Ignore control frames.
     stateful = true;
     receiver.expected.advance(f);
@@ -198,6 +199,7 @@
 }
     
 void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+    if (receiverTrackingDisabled) return; //Very nasty hack for push bridges
     assert(receiver.incomplete.contains(command)); // Internal error to 
complete command twice.
     SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
     SequenceNumber last = command;
@@ -237,7 +239,7 @@
     replayFlushLimit(flush), replayHardLimit(hard) {}
 
 SessionState::SessionState(const SessionId& i, const Configuration& c)
-    : id(i), timeout(), config(c), stateful()
+    : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false)
 {
     QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
 }
@@ -275,4 +277,7 @@
     receiver.bytesSinceKnownCompleted = 0;
 }
 
+void SessionState::disableReceiverTracking() { receiverTrackingDisabled = 
true; }
+void SessionState::enableReceiverTracking() { receiverTrackingDisabled = 
false; }
+
 } // namespace qpid

Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Mar  4 13:22:03 2009
@@ -181,6 +181,20 @@
         const SequenceSet& receivedIncomplete
     );
 
+    /**
+     * So called 'push' bridges work by faking a subscribe request
+     * (and the accompanyingflows etc) to the local broker to initiate
+     * the outflow of messages for the bridge.
+     * 
+     * As the peer doesn't send these it cannot include them in its
+     * session state. To keep the session state on either side of the
+     * bridge in sync, this hack allows the tracking of state for
+     * received messages to be disabled for the faked commands and
+     * subsequently re-enabled.
+     */
+    void disableReceiverTracking();
+    void enableReceiverTracking();
+
   private:
 
     struct SendState {
@@ -209,6 +223,7 @@
     uint32_t timeout;
     Configuration config;
     bool stateful;
+    bool receiverTrackingDisabled;//very nasty hack for 'push' bridges
 };
 
 inline bool operator==(const SessionId& id, const SessionState& s) { return s 
== id; }

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Mar  4 
13:22:03 2009
@@ -277,7 +277,6 @@
 }
 
 void SessionHandler::sendAttach(bool force) {
-    CHECK_ATTACHED("session.send-attach");
     QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << 
getState()->getId());
     peer.attach(getState()->getId().getName(), force);
     if (getState()->hasState())

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Mar  4 13:22:03 2009
@@ -22,6 +22,7 @@
 #include "ConnectionState.h"
 #include "Connection.h"
 #include "LinkRegistry.h"
+#include "SessionState.h"
 
 #include "qpid/agent/ManagementAgent.h"
 #include "qpid/framing/FieldTable.h"
@@ -80,31 +81,31 @@
     mgmtObject->resourceDestroy(); 
 }
 
-void Bridge::create(ConnectionState& c)
+void Bridge::create(Connection& c)
 {
+    connState = &c;
     FieldTable options;
     if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
-    connState = &c;
+    SessionHandler& sessionHandler = c.getChannel(id);
     if (args.i_srcIsLocal) {
         if (args.i_dynamic)
             throw Exception("Dynamic routing not supported for push routes");
         // Point the bridging commands at the local connection handler
-        Connection* conn = dynamic_cast<Connection*>(&c);
-        if (conn == 0)
-            return;
-        pushHandler.reset(new PushHandler(conn));
+        pushHandler.reset(new PushHandler(&c));
         channelHandler.reset(new framing::ChannelHandler(id, 
pushHandler.get()));
+
+        session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+        peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+        
+        session->attach(name, false);
+        session->commandPoint(0,0);
     } else {
+        sessionHandler.attachAs(name);
         // Point the bridging commands at the remote peer broker
-        channelHandler.reset(new framing::ChannelHandler(id, 
&(connState->getOutput())));
+        peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
     }
 
-    session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
-    peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
-    session->attach(name, false);
-    session->commandPoint(0,0);
-        
+    if (args.i_srcIsLocal) 
sessionHandler.getSession()->disableReceiverTracking();
     if (args.i_srcIsQueue) {        
         peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 
: 1, 0, false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
@@ -116,7 +117,7 @@
         if (args.i_tag.size()) {
             queueSettings.setString("qpid.trace.id", args.i_tag);
         } else {
-            const string& peerTag = connState->getFederationPeerTag();
+            const string& peerTag = c.getFederationPeerTag();
             if (peerTag.size())
                 queueSettings.setString("qpid.trace.id", peerTag);
         }
@@ -129,7 +130,7 @@
                 queueSettings.setString("qpid.trace.exclude", localTag);
         }
 
-        bool durable = false;//should this be an arg, or would be use 
srcIsQueue for durable queues?
+        bool durable = false;//should this be an arg, or would we use 
srcIsQueue for durable queues?
         bool autoDelete = !durable;//auto delete transient queues?
         peer->getQueue().declare(queueName, "", false, durable, true, 
autoDelete, queueSettings);
         if (!args.i_dynamic)
@@ -148,12 +149,23 @@
             QPID_LOG(debug, "Activated static route from exchange " << 
args.i_src << " to " << args.i_dest);
         }
     }
+    if (args.i_srcIsLocal) 
sessionHandler.getSession()->enableReceiverTracking();
 }
 
-void Bridge::cancel()
+void Bridge::cancel(Connection& c)
 {
+    if (args.i_srcIsLocal) {    
+        //recreate peer to be sure that the session handler reference
+        //is valid (it could have been deleted due to a detach)
+        SessionHandler& sessionHandler = c.getChannel(id);
+        peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
+    }
     peer->getMessage().cancel(args.i_dest);
     peer->getSession().detach(name);
+}
+
+void Bridge::closed()
+{
     if (args.i_dynamic) {
         Exchange::shared_ptr exchange = 
link->getBroker()->getExchanges().get(args.i_src);
         if (exchange.get() != 0)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Wed Mar  4 13:22:03 2009
@@ -52,8 +52,9 @@
            const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
     ~Bridge();
 
-    void create(ConnectionState& c);
-    void cancel();
+    void create(Connection& c);
+    void cancel(Connection& c);
+    void closed();
     void destroy();
     bool isDurable() { return args.i_durable; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Mar  4 13:22:03 2009
@@ -158,7 +158,7 @@
     }
 
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        (*i)->cancel();
+        (*i)->closed();
         created.push_back(*i);
     }
     active.clear();
@@ -217,21 +217,27 @@
 
 void Link::cancel(Bridge::shared_ptr bridge)
 {
-    Mutex::ScopedLock mutex(lock);
-
-    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            created.erase(i);
-            break;
+    {
+        Mutex::ScopedLock mutex(lock);
+        
+        for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                created.erase(i);
+                break;
+            }
         }
-    }
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            bridge->cancel();
-            active.erase(i);
-            break;
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                cancellations.push_back(bridge);
+                bridge->closed();
+                active.erase(i);
+                break;
+            }
         }
     }
+    if (!cancellations.empty()) {
+        connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
+    }
 }
 
 void Link::ioThreadProcessing()
@@ -242,7 +248,7 @@
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
-    //process any pending creates
+    //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
             active.push_back(*i);
@@ -250,6 +256,13 @@
         }
         created.clear();
     }
+    if (!cancellations.empty()) {
+        for (Bridges::iterator i = cancellations.begin(); i != 
cancellations.end(); ++i) {
+            active.push_back(*i);
+            (*i)->cancel(*connection);
+        }
+        cancellations.clear();
+    }
 }
 
 void Link::setConnection(Connection* c)
@@ -284,7 +297,7 @@
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+    else if (state == STATE_OPERATIONAL && (!created.empty() || 
!cancellations.empty()) && connection != 0)
         connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Wed Mar  4 13:22:03 2009
@@ -67,6 +67,7 @@
             typedef std::vector<Bridge::shared_ptr> Bridges;
             Bridges created;   // Bridges pending creation
             Bridges active;    // Bridges active
+            Bridges cancellations;    // Bridges pending cancellation
             uint channelCounter;
             Connection* connection;
             management::ManagementAgent* agent;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Mar  4 13:22:03 
2009
@@ -85,11 +85,23 @@
     if (session.get()) session->readyToSend();
 }
 
-// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
-// We need to integrate the client code so we can run a real client
-// in the bridge.
-// 
-void SessionHandler::attached(const std::string& name) {
+/**
+ * Used by inter-broker bridges to set up session id and attach
+ */
+void SessionHandler::attachAs(const std::string& name)
+{
+    SessionId id(connection.getUserId(), name);
+    SessionState::Configuration config = 
connection.broker.getSessionManager().getSessionConfig();
+    session.reset(new SessionState(connection.getBroker(), *this, id, config));
+    sendAttach(false);
+}
+
+/**
+ * TODO: this is a little ugly, fix it; its currently still relied on
+ * for 'push' bridges
+ */
+void SessionHandler::attached(const std::string& name)
+{
     if (session.get()) {
         amqp_0_10::SessionHandler::attached(name);
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Mar  4 13:22:03 
2009
@@ -66,9 +66,8 @@
     }
 
     virtual void handleDetach();
-    
-    // Overrides
-    void attached(const std::string& name);
+    void attached(const std::string& name);//used by 'pushing' inter-broker 
bridges
+    void attachAs(const std::string& name);//used by 'pulling' inter-broker 
bridges
 
   protected:
     virtual void setState(const std::string& sessionName, bool force);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to