Author: gsim
Date: Wed Apr 16 17:58:58 2014
New Revision: 1588000
URL: http://svn.apache.org/r1588000
Log:
QPID-5706: add optional domain to incoming and outgoing link objects
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Apr 16 17:58:58
2014
@@ -66,8 +66,8 @@ void Connection::trace(const char* messa
QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
}
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i,
BrokerContext& b, bool saslInUse)
- : BrokerContext(b), ManagedConnection(getBroker(), i),
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i,
BrokerContext& b, bool saslInUse, bool brokerInitiated)
+ : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
transport(pn_transport()),
out(o), id(i), haveOutput(true), closeInitiated(false),
closeRequested(false)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Apr 16 17:58:58
2014
@@ -46,7 +46,7 @@ class Session;
class Connection : public BrokerContext, public sys::ConnectionCodec, public
ManagedConnection
{
public:
- Connection(qpid::sys::OutputControl& out, const std::string& id,
BrokerContext& context, bool saslInUse);
+ Connection(qpid::sys::OutputControl& out, const std::string& id,
BrokerContext& context, bool saslInUse, bool brokerInitiated);
virtual ~Connection();
size_t decode(const char* buffer, size_t size);
virtual size_t encode(char* buffer, size_t size);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp Wed Apr 16 17:58:58 2014
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementAgent.h"
#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>
+#include <sstream>
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -176,7 +177,7 @@ qpid::sys::ConnectionCodec* Interconnect
qpid::sys::ConnectionCodec*
InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string&
id, const qpid::sys::SecuritySettings& t)
{
bool useSasl = domain->getMechanisms() != NONE;
- boost::shared_ptr<Interconnect> connection(new Interconnect(out, id,
*this, useSasl, incoming, name, source, target));
+ boost::shared_ptr<Interconnect> connection(new Interconnect(out, id,
*this, useSasl, incoming, name, source, target, domain->getName()));
if (!relay) getInterconnects().add(name, connection);
else connection->setRelay(relay);
@@ -199,7 +200,9 @@ bool InterconnectFactory::connect()
next++;
hostname = address.host;
QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
- getBroker().connect(name, address.host,
boost::lexical_cast<std::string>(address.port), address.protocol, this,
boost::bind(&InterconnectFactory::failed, this, _1, _2));
+ std::stringstream identifier;
+ identifier << name << "@" << domain->getName();
+ getBroker().connect(identifier.str(), address.host,
boost::lexical_cast<std::string>(address.port), address.protocol, this,
boost::bind(&InterconnectFactory::failed, this, _1, _2));
return true;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Wed Apr 16
17:58:58 2014
@@ -28,6 +28,7 @@
#include "qpid/SaslFactory.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SystemInfo.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
extern "C" {
@@ -40,9 +41,9 @@ namespace broker {
namespace amqp {
Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string&
id, BrokerContext& broker, bool saslInUse,
- bool i, const std::string& n, const std::string& s,
const std::string& t)
- : Connection(out, id, broker, true), incoming(i), name(n), source(s),
target(t), headerDiscarded(saslInUse),
- closeRequested(false), isTransportDeleted(false)
+ bool i, const std::string& n, const std::string& s,
const std::string& t, const std::string& d)
+ : Connection(out, id, broker, true, true), incoming(i), name(n),
source(s), target(t), domain(d), headerDiscarded(saslInUse),
+ isOpened(false), closeRequested(false), isTransportDeleted(false)
{}
Interconnect::~Interconnect()
@@ -74,6 +75,32 @@ size_t Interconnect::encode(char* buffer
}
}
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+void setProperties(pn_connection_t* connection)
+{
+ pn_data_t* data = pn_connection_properties(connection);
+ pn_data_put_map(data);
+ pn_data_enter(data);
+
+ pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ std::string processName = sys::SystemInfo::getProcessName();
+ pn_data_put_string(data, convert(processName));
+
+ pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_int(data, sys::SystemInfo::getProcessId());
+ pn_data_exit(data);
+}
+}
+
void Interconnect::process()
{
QPID_LOG(trace, id << " processing interconnect");
@@ -81,8 +108,23 @@ void Interconnect::process()
close();
} else {
if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
- QPID_LOG_CAT(debug, model, id << " interconnect opened");
- open();
+ QPID_LOG_CAT(debug, model, id << " interconnect open initiated");
+ pn_connection_set_container(connection,
getBroker().getFederationTag().c_str());
+ setProperties(connection);
+ pn_connection_open(connection);
+ out.connectionEstablished();
+ setInterconnectDomain(domain);
+ }
+ if (!isOpened && (pn_connection_state(connection) & PN_REMOTE_ACTIVE))
{
+ QPID_LOG_CAT(debug, model, id << " interconnect open completed,
attaching link");
+ isOpened = true;
+ readPeerProperties();
+ const char*
containerid(pn_connection_remote_container(connection));
+ if (containerid) {
+ setContainerId(std::string(containerid));
+ }
+ opened();
+ getBroker().getConnectionObservers().opened(*this);
pn_session_t* s = pn_session(connection);
pn_session_open(s);
boost::shared_ptr<Session> ssn(new Session(s, *this, out));
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Wed Apr 16 17:58:58
2014
@@ -38,7 +38,8 @@ class Interconnect : public Connection
{
public:
Interconnect(qpid::sys::OutputControl& out, const std::string& id,
BrokerContext& broker, bool saslInUse,
- bool incoming, const std::string& name, const std::string&
source, const std::string& target);
+ bool incoming, const std::string& name,
+ const std::string& source, const std::string& target, const
std::string& domain);
void setRelay(boost::shared_ptr<Relay>);
~Interconnect();
size_t encode(char* buffer, size_t size);
@@ -50,8 +51,10 @@ class Interconnect : public Connection
std::string name;
std::string source;
std::string target;
+ std::string domain;
bool headerDiscarded;
boost::shared_ptr<Relay> relay;
+ bool isOpened;
bool closeRequested;
bool isTransportDeleted;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Wed Apr 16
17:58:58 2014
@@ -47,14 +47,14 @@ template <typename T> T getProperty(cons
}
}
}
-ManagedConnection::ManagedConnection(Broker& broker, const std::string i) :
id(i), agent(0)
+ManagedConnection::ManagedConnection(Broker& broker, const std::string i, bool
brokerInitiated) : id(i), agent(0)
{
//management integration:
agent = broker.getManagementAgent();
if (agent != 0) {
qpid::management::Manageable* parent = broker.GetVhostObject();
// TODO set last bool true if system connection
- connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent,
this, parent, id, true, false, "AMQP 1.0"));
+ connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent,
this, parent, id, !brokerInitiated, brokerInitiated, "AMQP 1.0"));
connection->set_shadow(false);
agent->addObject(connection);
}
@@ -132,6 +132,15 @@ const std::string& ManagedConnection::ge
return containerid;
}
+void ManagedConnection::setInterconnectDomain(const std::string& d)
+{
+ domain = d;
+}
+const std::string& ManagedConnection::getInterconnectDomain() const
+{
+ return domain;
+}
+
qpid::management::ManagementObject::shared_ptr
ManagedConnection::GetManagementObject() const
{
return connection;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Wed Apr 16
17:58:58 2014
@@ -39,7 +39,7 @@ namespace amqp {
class ManagedConnection : public qpid::management::Manageable, public
OwnershipToken, public qpid::broker::Connection
{
public:
- ManagedConnection(Broker& broker, const std::string id);
+ ManagedConnection(Broker& broker, const std::string id, bool
brokerInitiated);
virtual ~ManagedConnection();
virtual void setUserId(const std::string&);
std::string getId() const;
@@ -47,6 +47,8 @@ class ManagedConnection : public qpid::m
void setSaslSsf(int);
void setContainerId(const std::string&);
const std::string& getContainerId() const;
+ void setInterconnectDomain(const std::string&);
+ const std::string& getInterconnectDomain() const;
void setPeerProperties(std::map<std::string, types::Variant>&);
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const OwnershipToken* t) const;
@@ -70,6 +72,7 @@ class ManagedConnection : public qpid::m
const std::string id;
std::string userid;
std::string containerid;
+ std::string domain;
qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
qpid::management::ManagementAgent* agent;
std::map<std::string, types::Variant> peerProperties;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp Wed Apr 16
17:58:58 2014
@@ -36,7 +36,8 @@ ManagedIncomingLink::ManagedIncomingLink
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this,
&parent, parent.getParent().getContainerId(), _name, source, target));
+ incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this,
&parent, parent.getParent().getContainerId(), _name, source, target,
+
parent.getParent().getInterconnectDomain()));
agent->addObject(incoming);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp Wed Apr 16
17:58:58 2014
@@ -36,7 +36,8 @@ ManagedOutgoingLink::ManagedOutgoingLink
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this,
&parent, parent.getParent().getContainerId(), _name, source, target));
+ outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this,
&parent, parent.getParent().getContainerId(), _name, source, target,
+
parent.getParent().getInterconnectDomain()));
agent->addObject(outgoing);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Wed Apr 16
17:58:58 2014
@@ -126,7 +126,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl
throw qpid::Exception("SASL layer required!");
} else {
QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
- return new qpid::broker::amqp::Connection(out, id, *this,
false);
+ return new qpid::broker::amqp::Connection(out, id, *this,
false, false);
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Wed Apr 16 17:58:58 2014
@@ -32,7 +32,7 @@ namespace broker {
namespace amqp {
Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext&
context, std::auto_ptr<qpid::SaslServer> auth)
- : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true),
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true,
false),
authenticator(auth),
state(INCOMPLETE), writeHeader(true), haveOutput(true)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml Wed Apr 16
17:58:58 2014
@@ -399,6 +399,7 @@
<property name="name" type="sstr" access="RC" index="y"/>
<property name="source" type="sstr" access="RC"/>
<property name="target" type="sstr" access="RC"/>
+ <property name="domain" type="sstr" access="RC"/>
<statistic name="transfers" type="count64" unit="message"
desc="Messages transfered"/>
</class>
<!--
@@ -412,6 +413,7 @@
<property name="name" type="sstr" access="RC" index="y"/>
<property name="source" type="sstr" access="RC"/>
<property name="target" type="sstr" access="RC"/>
+ <property name="domain" type="sstr" access="RC"/>
<statistic name="transfers" type="count64" unit="message"
desc="Messages transfered"/>
</class>
<!--
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]