Author: gsim
Date: Fri Sep 20 15:44:05 2013
New Revision: 1525044
URL: http://svn.apache.org/r1525044
Log:
QPID-5150: fixes for QMFv2 over AMQP 1.0
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Sep 20
15:44:05 2013
@@ -41,6 +41,7 @@ const std::string EMPTY;
const std::string FORWARD_SLASH("/");
const std::string TEXT_PLAIN("text/plain");
const std::string SUBJECT_KEY("qpid.subject");
+const std::string APP_ID("x-amqp-0-10.app-id");
qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
{
@@ -253,6 +254,10 @@ boost::intrusive_ptr<const qpid::broker:
if (ap) {
qpid::amqp::Decoder d(ap.data, ap.size);
qpid::amqp_0_10::translate(d.readMap(),
props->getApplicationHeaders());
+ std::string appid =
props->getApplicationHeaders().getAsString(APP_ID);
+ if (!appid.empty()) {
+ props->setAppId(appid);
+ }
}
qpid::framing::DeliveryProperties* dp =
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Sep 20
15:44:05 2013
@@ -39,6 +39,7 @@
#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/types/Variant.h"
#include "qpid/types/Uuid.h"
#include "qpid/framing/List.h"
@@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
}
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
protocols(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -221,6 +222,7 @@ void ManagementAgent::configure(const st
timer = &broker->getTimer();
timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing,
this), timer, interval));
+ protocols = &broker->getProtocolRegistry();
// Get from file or generate and save to file.
if (dataDir.empty())
{
@@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMess
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- qpid::broker::amqp_0_10::MessageTransfer&
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>
transfer = protocols->translate(msg);
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ?
transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
@@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMess
// authorization failed, send reply if replyTo present
- qpid::broker::amqp_0_10::MessageTransfer&
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>
transfer = protocols->translate(msg);
const framing::MessageProperties* p =
-
transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ?
transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
string rte = rt.getExchange();
@@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentComma
{
string rte;
string rtk;
- qpid::broker::amqp_0_10::MessageTransfer&
transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
- const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>
transfer = protocols->translate(msg);
+ const framing::MessageProperties* p = transfer ?
+ transfer->getFrames().getHeaders()->get<framing::MessageProperties>()
: 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Sep 20
15:44:05 2013
@@ -44,6 +44,7 @@
namespace qpid {
namespace broker {
class Connection;
+class ProtocolRegistry;
}
namespace sys {
class Timer;
@@ -256,6 +257,7 @@ private:
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
+ qpid::broker::ProtocolRegistry* protocols;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1525044&r1=1525043&r2=1525044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Fri Sep 20
15:44:05 2013
@@ -134,6 +134,7 @@ namespace {
const std::string X_AMQP("x-amqp-");
const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
{
@@ -353,7 +354,7 @@ class ApplicationPropertiesAdapter : pub
{
for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i
!= headers.end(); ++i) {
//strip out values with special keys as they are sent in standard
fields
- if (!startsWith(i->first, X_AMQP)) {
+ if (!startsWith(i->first, X_AMQP) || i->first ==
X_AMQP_0_10_APP_ID) {
qpid::amqp::CharSequence key(convert(i->first));
switch (i->second.getType()) {
case qpid::types::VAR_VOID:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]