Author: aconway
Date: Tue Aug 4 18:38:38 2015
New Revision: 1694094
URL: http://svn.apache.org/r1694094
Log:
QPID-6577: HA - backup broker messages are larger than primary messages.
Under the 0-10 protocol (used by HA) brokers add an "exchange" property to each
message for the exchange the message arrived on .This is different (and
sometimes longer) on the backup brokers from the primary since on the backups
the message arrives on a special replication exchange.
This fixes backup brokers to not modify the exchange property on messages.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Aug 4 18:38:38
2015
@@ -37,7 +37,7 @@ namespace
const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder() : state(DORMANT) {}
+MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& fr
header.setEof(false);
message->getFrames().append(header);
} else if (type == HEADER_BODY) {
-
frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+ if (copyExchange) {
+
frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->
+ setExchange(exchange);
+ }
} else {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header
or content got "
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Tue Aug 4 18:38:38
2015
@@ -43,11 +43,14 @@ namespace qpid {
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer>
getMessage();
QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
void end();
+ void setCopyExchange(bool value) { copyExchange = value; }
+
private:
enum State {DORMANT, METHOD, HEADER, CONTENT};
State state;
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer>
message;
std::string exchange;
+ bool copyExchange;
void checkType(uint8_t expected, uint8_t actual);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Aug 4 18:38:38 2015
@@ -147,6 +147,9 @@ class SessionState : public qpid::Sessio
/** Send result and completion for a given command to the client. */
void completeCommand(SequenceNumber id, bool requiresAccept, bool
requiresSync,
const std::string& result);
+
+ MessageBuilder& getMessageBuilder() { return msgBuilder; }
+
private:
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Aug 4 18:38:38 2015
@@ -35,6 +35,7 @@
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(B
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
+ if (sessionHandler->getSession()) {
+ // Don't overwrite the exchange property set on the primary.
+
sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
+ }
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
FieldTable arguments;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]