Author: gsim
Date: Wed Apr 16 17:58:58 2014
New Revision: 1588000

URL: http://svn.apache.org/r1588000
Log:
QPID-5706: add optional domain to incoming and outgoing link objects

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Apr 16 17:58:58 
2014
@@ -66,8 +66,8 @@ void Connection::trace(const char* messa
     QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
 }
 
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, 
BrokerContext& b, bool saslInUse)
-    : BrokerContext(b), ManagedConnection(getBroker(), i),
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, 
BrokerContext& b, bool saslInUse, bool brokerInitiated)
+    : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
       connection(pn_connection()),
       transport(pn_transport()),
       out(o), id(i), haveOutput(true), closeInitiated(false), 
closeRequested(false)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Apr 16 17:58:58 
2014
@@ -46,7 +46,7 @@ class Session;
 class Connection : public BrokerContext, public sys::ConnectionCodec, public 
ManagedConnection
 {
   public:
-    Connection(qpid::sys::OutputControl& out, const std::string& id, 
BrokerContext& context, bool saslInUse);
+    Connection(qpid::sys::OutputControl& out, const std::string& id, 
BrokerContext& context, bool saslInUse, bool brokerInitiated);
     virtual ~Connection();
     size_t decode(const char* buffer, size_t size);
     virtual size_t encode(char* buffer, size_t size);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Domain.cpp Wed Apr 16 17:58:58 2014
@@ -31,6 +31,7 @@
 #include "qpid/management/ManagementAgent.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/lexical_cast.hpp>
+#include <sstream>
 
 namespace _qmf = qmf::org::apache::qpid::broker;
 
@@ -176,7 +177,7 @@ qpid::sys::ConnectionCodec* Interconnect
 qpid::sys::ConnectionCodec* 
InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& 
id, const qpid::sys::SecuritySettings& t)
 {
     bool useSasl = domain->getMechanisms() != NONE;
-    boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, 
*this, useSasl, incoming, name, source, target));
+    boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, 
*this, useSasl, incoming, name, source, target, domain->getName()));
     if (!relay) getInterconnects().add(name, connection);
     else connection->setRelay(relay);
 
@@ -199,7 +200,9 @@ bool InterconnectFactory::connect()
     next++;
     hostname = address.host;
     QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
-    getBroker().connect(name, address.host, 
boost::lexical_cast<std::string>(address.port), address.protocol, this, 
boost::bind(&InterconnectFactory::failed, this, _1, _2));
+    std::stringstream identifier;
+    identifier << name << "@" << domain->getName();
+    getBroker().connect(identifier.str(), address.host, 
boost::lexical_cast<std::string>(address.port), address.protocol, this, 
boost::bind(&InterconnectFactory::failed, this, _1, _2));
     return true;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp Wed Apr 16 
17:58:58 2014
@@ -28,6 +28,7 @@
 #include "qpid/SaslFactory.h"
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SystemInfo.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
 extern "C" {
@@ -40,9 +41,9 @@ namespace broker {
 namespace amqp {
 
 Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& 
id, BrokerContext& broker, bool saslInUse,
-                           bool i, const std::string& n, const std::string& s, 
const std::string& t)
-    : Connection(out, id, broker, true), incoming(i), name(n), source(s), 
target(t), headerDiscarded(saslInUse),
-      closeRequested(false), isTransportDeleted(false)
+                           bool i, const std::string& n, const std::string& s, 
const std::string& t, const std::string& d)
+    : Connection(out, id, broker, true, true), incoming(i), name(n), 
source(s), target(t), domain(d), headerDiscarded(saslInUse),
+      isOpened(false), closeRequested(false), isTransportDeleted(false)
 {}
 
 Interconnect::~Interconnect()
@@ -74,6 +75,32 @@ size_t Interconnect::encode(char* buffer
     }
 }
 
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+pn_bytes_t convert(const std::string& s)
+{
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+void setProperties(pn_connection_t* connection)
+{
+    pn_data_t* data = pn_connection_properties(connection);
+    pn_data_put_map(data);
+    pn_data_enter(data);
+
+    pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+    std::string processName = sys::SystemInfo::getProcessName();
+    pn_data_put_string(data, convert(processName));
+
+    pn_data_put_symbol(data, convert(CLIENT_PID));
+    pn_data_put_int(data, sys::SystemInfo::getProcessId());
+    pn_data_exit(data);
+}
+}
+
 void Interconnect::process()
 {
     QPID_LOG(trace, id << " processing interconnect");
@@ -81,8 +108,23 @@ void Interconnect::process()
         close();
     } else {
         if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
-            QPID_LOG_CAT(debug, model, id << " interconnect opened");
-            open();
+            QPID_LOG_CAT(debug, model, id << " interconnect open initiated");
+            pn_connection_set_container(connection, 
getBroker().getFederationTag().c_str());
+            setProperties(connection);
+            pn_connection_open(connection);
+            out.connectionEstablished();
+            setInterconnectDomain(domain);
+        }
+        if (!isOpened && (pn_connection_state(connection) & PN_REMOTE_ACTIVE)) 
{
+            QPID_LOG_CAT(debug, model, id << " interconnect open completed, 
attaching link");
+            isOpened = true;
+            readPeerProperties();
+            const char* 
containerid(pn_connection_remote_container(connection));
+            if (containerid) {
+                setContainerId(std::string(containerid));
+            }
+            opened();
+            getBroker().getConnectionObservers().opened(*this);
             pn_session_t* s = pn_session(connection);
             pn_session_open(s);
             boost::shared_ptr<Session> ssn(new Session(s, *this, out));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Interconnect.h Wed Apr 16 17:58:58 
2014
@@ -38,7 +38,8 @@ class Interconnect : public Connection
 {
   public:
     Interconnect(qpid::sys::OutputControl& out, const std::string& id, 
BrokerContext& broker, bool saslInUse,
-                 bool incoming, const std::string& name, const std::string& 
source, const std::string& target);
+                 bool incoming, const std::string& name,
+                 const std::string& source, const std::string& target, const 
std::string& domain);
     void setRelay(boost::shared_ptr<Relay>);
     ~Interconnect();
     size_t encode(char* buffer, size_t size);
@@ -50,8 +51,10 @@ class Interconnect : public Connection
     std::string name;
     std::string source;
     std::string target;
+    std::string domain;
     bool headerDiscarded;
     boost::shared_ptr<Relay> relay;
+    bool isOpened;
     bool closeRequested;
     bool isTransportDeleted;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Wed Apr 16 
17:58:58 2014
@@ -47,14 +47,14 @@ template <typename T> T getProperty(cons
     }
 }
 }
-ManagedConnection::ManagedConnection(Broker& broker, const std::string i) : 
id(i), agent(0)
+ManagedConnection::ManagedConnection(Broker& broker, const std::string i, bool 
brokerInitiated) : id(i), agent(0)
 {
     //management integration:
     agent = broker.getManagementAgent();
     if (agent != 0) {
         qpid::management::Manageable* parent = broker.GetVhostObject();
         // TODO set last bool true if system connection
-        connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, 
this, parent, id, true, false, "AMQP 1.0"));
+        connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, 
this, parent, id, !brokerInitiated, brokerInitiated, "AMQP 1.0"));
         connection->set_shadow(false);
         agent->addObject(connection);
     }
@@ -132,6 +132,15 @@ const std::string& ManagedConnection::ge
     return containerid;
 }
 
+void ManagedConnection::setInterconnectDomain(const std::string& d)
+{
+    domain = d;
+}
+const std::string& ManagedConnection::getInterconnectDomain() const
+{
+    return domain;
+}
+
 qpid::management::ManagementObject::shared_ptr 
ManagedConnection::GetManagementObject() const
 {
     return connection;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Wed Apr 16 
17:58:58 2014
@@ -39,7 +39,7 @@ namespace amqp {
 class ManagedConnection : public qpid::management::Manageable, public 
OwnershipToken, public qpid::broker::Connection
 {
   public:
-    ManagedConnection(Broker& broker, const std::string id);
+    ManagedConnection(Broker& broker, const std::string id, bool 
brokerInitiated);
     virtual ~ManagedConnection();
     virtual void setUserId(const std::string&);
     std::string getId() const;
@@ -47,6 +47,8 @@ class ManagedConnection : public qpid::m
     void setSaslSsf(int);
     void setContainerId(const std::string&);
     const std::string& getContainerId() const;
+    void setInterconnectDomain(const std::string&);
+    const std::string& getInterconnectDomain() const;
     void setPeerProperties(std::map<std::string, types::Variant>&);
     qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
     bool isLocal(const OwnershipToken* t) const;
@@ -70,6 +72,7 @@ class ManagedConnection : public qpid::m
     const std::string id;
     std::string userid;
     std::string containerid;
+    std::string domain;
     qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
     qpid::management::ManagementAgent* agent;
     std::map<std::string, types::Variant> peerProperties;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp Wed Apr 16 
17:58:58 2014
@@ -36,7 +36,8 @@ ManagedIncomingLink::ManagedIncomingLink
 {
     qpid::management::ManagementAgent* agent = broker.getManagementAgent();
     if (agent) {
-        incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, 
&parent, parent.getParent().getContainerId(), _name, source, target));
+        incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, 
&parent, parent.getParent().getContainerId(), _name, source, target,
+                                                                 
parent.getParent().getInterconnectDomain()));
         agent->addObject(incoming);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp Wed Apr 16 
17:58:58 2014
@@ -36,7 +36,8 @@ ManagedOutgoingLink::ManagedOutgoingLink
 {
     qpid::management::ManagementAgent* agent = broker.getManagementAgent();
     if (agent) {
-        outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, 
&parent, parent.getParent().getContainerId(), _name, source, target));
+        outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, 
&parent, parent.getParent().getContainerId(), _name, source, target,
+                                                                 
parent.getParent().getInterconnectDomain()));
         agent->addObject(outgoing);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Wed Apr 16 
17:58:58 2014
@@ -126,7 +126,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl
                 throw qpid::Exception("SASL layer required!");
             } else {
                 QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
-                return new qpid::broker::amqp::Connection(out, id, *this, 
false);
+                return new qpid::broker::amqp::Connection(out, id, *this, 
false, false);
             }
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp Wed Apr 16 17:58:58 2014
@@ -32,7 +32,7 @@ namespace broker {
 namespace amqp {
 
 Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& 
context, std::auto_ptr<qpid::SaslServer> auth)
-    : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true),
+    : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true, 
false),
       authenticator(auth),
       state(INCOMPLETE), writeHeader(true), haveOutput(true)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml?rev=1588000&r1=1587999&r2=1588000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml Wed Apr 16 
17:58:58 2014
@@ -399,6 +399,7 @@
     <property name="name"           type="sstr"     access="RC" index="y"/>
     <property name="source"         type="sstr"     access="RC"/>
     <property name="target"         type="sstr"     access="RC"/>
+    <property name="domain"         type="sstr"     access="RC"/>
     <statistic name="transfers"     type="count64"  unit="message" 
desc="Messages transfered"/>
   </class>
   <!--
@@ -412,6 +413,7 @@
     <property name="name"           type="sstr"     access="RC" index="y"/>
     <property name="source"         type="sstr"     access="RC"/>
     <property name="target"         type="sstr"     access="RC"/>
+    <property name="domain"         type="sstr"     access="RC"/>
     <statistic name="transfers"     type="count64"  unit="message" 
desc="Messages transfered"/>
   </class>
   <!--



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to