Author: gsim
Date: Fri Sep 20 15:44:05 2013
New Revision: 1525044

URL: http://svn.apache.org/r1525044
Log:
QPID-5150: fixes for QMFv2 over AMQP 1.0

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Sep 20 
15:44:05 2013
@@ -41,6 +41,7 @@ const std::string EMPTY;
 const std::string FORWARD_SLASH("/");
 const std::string TEXT_PLAIN("text/plain");
 const std::string SUBJECT_KEY("qpid.subject");
+const std::string APP_ID("x-amqp-0-10.app-id");
 
 qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
 {
@@ -253,6 +254,10 @@ boost::intrusive_ptr<const qpid::broker:
             if (ap) {
                 qpid::amqp::Decoder d(ap.data, ap.size);
                 qpid::amqp_0_10::translate(d.readMap(), 
props->getApplicationHeaders());
+                std::string appid = 
props->getApplicationHeaders().getAsString(APP_ID);
+                if (!appid.empty()) {
+                    props->setAppId(appid);
+                }
             }
 
             qpid::framing::DeliveryProperties* dp =

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Sep 20 
15:44:05 2013
@@ -39,6 +39,7 @@
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
 #include "qpid/types/Variant.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/framing/List.h"
@@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
 }
 
 ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
-    threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
+    threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), 
protocols(0),
     startTime(sys::now()),
     suppressed(false), disallowAllV1Methods(false),
     vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -221,6 +222,7 @@ void ManagementAgent::configure(const st
     timer          = &broker->getTimer();
     timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, 
this), timer, interval));
 
+    protocols = &broker->getProtocolRegistry();
     // Get from file or generate and save to file.
     if (dataDir.empty())
     {
@@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMess
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    qpid::broker::amqp_0_10::MessageTransfer& 
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> 
transfer = protocols->translate(msg);
     const framing::MessageProperties* p =
-        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+        transfer ? 
transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
 
     const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
 
@@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMess
 
         // authorization failed, send reply if replyTo present
 
-        qpid::broker::amqp_0_10::MessageTransfer& 
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+        boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> 
transfer = protocols->translate(msg);
         const framing::MessageProperties* p =
-            
transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+            transfer ? 
transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
             string rte = rt.getExchange();
@@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentComma
 {
     string   rte;
     string   rtk;
-    qpid::broker::amqp_0_10::MessageTransfer& 
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
-    const framing::MessageProperties* p =
-        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> 
transfer = protocols->translate(msg);
+    const framing::MessageProperties* p = transfer ?
+        transfer->getFrames().getHeaders()->get<framing::MessageProperties>() 
: 0;
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
         rte = rt.getExchange();

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Sep 20 
15:44:05 2013
@@ -44,6 +44,7 @@
 namespace qpid {
 namespace broker {
 class Connection;
+class ProtocolRegistry;
 }
 namespace sys {
 class Timer;
@@ -256,6 +257,7 @@ private:
     uint16_t                     interval;
     qpid::broker::Broker*        broker;
     qpid::sys::Timer*            timer;
+    qpid::broker::ProtocolRegistry* protocols;
     uint16_t                     bootSequence;
     uint32_t                     nextObjectId;
     uint32_t                     brokerBank;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Fri Sep 20 
15:44:05 2013
@@ -134,6 +134,7 @@ namespace {
 const std::string X_AMQP("x-amqp-");
 const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
 const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
 
 class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
 {
@@ -353,7 +354,7 @@ class ApplicationPropertiesAdapter : pub
     {
         for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i 
!= headers.end(); ++i) {
             //strip out values with special keys as they are sent in standard 
fields
-            if (!startsWith(i->first, X_AMQP)) {
+            if (!startsWith(i->first, X_AMQP) || i->first == 
X_AMQP_0_10_APP_ID) {
                 qpid::amqp::CharSequence key(convert(i->first));
                 switch (i->second.getType()) {
                   case qpid::types::VAR_VOID:



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to