Author: gsim
Date: Wed Mar 4 13:22:03 2009
New Revision: 750002
URL: http://svn.apache.org/viewvc?rev=750002&view=rev
Log:
QPID-1711: Ensure the session state between the two peers in an inter-broker
bridging session are kept in sync.
(Also made changes to cancellation to ensure that the commands are only issued
on the io thread of the connection)
Modified:
qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Mar 4 13:22:03 2009
@@ -183,6 +183,7 @@
}
bool SessionState::receiverRecord(const AMQFrame& f) {
+ if (receiverTrackingDisabled) return true; //Very nasty hack for push
bridges
if (isControl(f)) return true; // Ignore control frames.
stateful = true;
receiver.expected.advance(f);
@@ -198,6 +199,7 @@
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ if (receiverTrackingDisabled) return; //Very nasty hack for push bridges
assert(receiver.incomplete.contains(command)); // Internal error to
complete command twice.
SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
SequenceNumber last = command;
@@ -237,7 +239,7 @@
replayFlushLimit(flush), replayHardLimit(hard) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : id(i), timeout(), config(c), stateful()
+ : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false)
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
@@ -275,4 +277,7 @@
receiver.bytesSinceKnownCompleted = 0;
}
+void SessionState::disableReceiverTracking() { receiverTrackingDisabled =
true; }
+void SessionState::enableReceiverTracking() { receiverTrackingDisabled =
false; }
+
} // namespace qpid
Modified: qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Mar 4 13:22:03 2009
@@ -181,6 +181,20 @@
const SequenceSet& receivedIncomplete
);
+ /**
+ * So called 'push' bridges work by faking a subscribe request
+ * (and the accompanyingflows etc) to the local broker to initiate
+ * the outflow of messages for the bridge.
+ *
+ * As the peer doesn't send these it cannot include them in its
+ * session state. To keep the session state on either side of the
+ * bridge in sync, this hack allows the tracking of state for
+ * received messages to be disabled for the faked commands and
+ * subsequently re-enabled.
+ */
+ void disableReceiverTracking();
+ void enableReceiverTracking();
+
private:
struct SendState {
@@ -209,6 +223,7 @@
uint32_t timeout;
Configuration config;
bool stateful;
+ bool receiverTrackingDisabled;//very nasty hack for 'push' bridges
};
inline bool operator==(const SessionId& id, const SessionState& s) { return s
== id; }
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Mar 4
13:22:03 2009
@@ -277,7 +277,6 @@
}
void SessionHandler::sendAttach(bool force) {
- CHECK_ATTACHED("session.send-attach");
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" <<
getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Mar 4 13:22:03 2009
@@ -22,6 +22,7 @@
#include "ConnectionState.h"
#include "Connection.h"
#include "LinkRegistry.h"
+#include "SessionState.h"
#include "qpid/agent/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
@@ -80,31 +81,31 @@
mgmtObject->resourceDestroy();
}
-void Bridge::create(ConnectionState& c)
+void Bridge::create(Connection& c)
{
+ connState = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- connState = &c;
+ SessionHandler& sessionHandler = c.getChannel(id);
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
- Connection* conn = dynamic_cast<Connection*>(&c);
- if (conn == 0)
- return;
- pushHandler.reset(new PushHandler(conn));
+ pushHandler.reset(new PushHandler(&c));
channelHandler.reset(new framing::ChannelHandler(id,
pushHandler.get()));
+
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
+ session->commandPoint(0,0);
} else {
+ sessionHandler.attachAs(name);
// Point the bridging commands at the remote peer broker
- channelHandler.reset(new framing::ChannelHandler(id,
&(connState->getOutput())));
+ peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
- session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
- peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
- session->attach(name, false);
- session->commandPoint(0,0);
-
+ if (args.i_srcIsLocal)
sessionHandler.getSession()->disableReceiverTracking();
if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0
: 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
@@ -116,7 +117,7 @@
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
- const string& peerTag = connState->getFederationPeerTag();
+ const string& peerTag = c.getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.id", peerTag);
}
@@ -129,7 +130,7 @@
queueSettings.setString("qpid.trace.exclude", localTag);
}
- bool durable = false;//should this be an arg, or would be use
srcIsQueue for durable queues?
+ bool durable = false;//should this be an arg, or would we use
srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
peer->getQueue().declare(queueName, "", false, durable, true,
autoDelete, queueSettings);
if (!args.i_dynamic)
@@ -148,12 +149,23 @@
QPID_LOG(debug, "Activated static route from exchange " <<
args.i_src << " to " << args.i_dest);
}
}
+ if (args.i_srcIsLocal)
sessionHandler.getSession()->enableReceiverTracking();
}
-void Bridge::cancel()
+void Bridge::cancel(Connection& c)
{
+ if (args.i_srcIsLocal) {
+ //recreate peer to be sure that the session handler reference
+ //is valid (it could have been deleted due to a detach)
+ SessionHandler& sessionHandler = c.getChannel(id);
+ peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
+ }
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
+}
+
+void Bridge::closed()
+{
if (args.i_dynamic) {
Exchange::shared_ptr exchange =
link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() != 0)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Wed Mar 4 13:22:03 2009
@@ -52,8 +52,9 @@
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
~Bridge();
- void create(ConnectionState& c);
- void cancel();
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
void destroy();
bool isDurable() { return args.i_durable; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Mar 4 13:22:03 2009
@@ -158,7 +158,7 @@
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->cancel();
+ (*i)->closed();
created.push_back(*i);
}
active.clear();
@@ -217,21 +217,27 @@
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- bridge->cancel();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
}
+ if (!cancellations.empty()) {
+ connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::ioThreadProcessing()
@@ -242,7 +248,7 @@
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
- //process any pending creates
+ //process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
@@ -250,6 +256,13 @@
}
created.clear();
}
+ if (!cancellations.empty()) {
+ for (Bridges::iterator i = cancellations.begin(); i !=
cancellations.end(); ++i) {
+ active.push_back(*i);
+ (*i)->cancel(*connection);
+ }
+ cancellations.clear();
+ }
}
void Link::setConnection(Connection* c)
@@ -284,7 +297,7 @@
}
}
}
- else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!created.empty() ||
!cancellations.empty()) && connection != 0)
connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Wed Mar 4 13:22:03 2009
@@ -67,6 +67,7 @@
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
+ Bridges cancellations; // Bridges pending cancellation
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Mar 4 13:22:03
2009
@@ -85,11 +85,23 @@
if (session.get()) session->readyToSend();
}
-// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
-// We need to integrate the client code so we can run a real client
-// in the bridge.
-//
-void SessionHandler::attached(const std::string& name) {
+/**
+ * Used by inter-broker bridges to set up session id and attach
+ */
+void SessionHandler::attachAs(const std::string& name)
+{
+ SessionId id(connection.getUserId(), name);
+ SessionState::Configuration config =
connection.broker.getSessionManager().getSessionConfig();
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
+ sendAttach(false);
+}
+
+/**
+ * TODO: this is a little ugly, fix it; its currently still relied on
+ * for 'push' bridges
+ */
+void SessionHandler::attached(const std::string& name)
+{
if (session.get()) {
amqp_0_10::SessionHandler::attached(name);
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=750002&r1=750001&r2=750002&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Mar 4 13:22:03
2009
@@ -66,9 +66,8 @@
}
virtual void handleDetach();
-
- // Overrides
- void attached(const std::string& name);
+ void attached(const std::string& name);//used by 'pushing' inter-broker
bridges
+ void attachAs(const std::string& name);//used by 'pulling' inter-broker
bridges
protected:
virtual void setState(const std::string& sessionName, bool force);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]