Author: aconway
Date: Mon Oct 5 15:08:25 2009
New Revision: 821830
URL: http://svn.apache.org/viewvc?rev=821830&view=rev
Log:
Fixed: cluster udpate did not presever deliver-properties.exchange on messages.
Also minor improvements:
- Improved debug logging for consumers.
- Cluster tests scripts work with latest corosync: don't check/set ais group.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
qpid/trunk/qpid/cpp/src/tests/ais_check
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 5 15:08:25
2009
@@ -302,6 +302,18 @@
return !blocked;
}
+namespace {
+struct ConsumerName {
+ const SemanticState::ConsumerImpl& consumer;
+ ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+ return o << pc.consumer.getName() << " on "
+ << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " <<
originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
@@ -320,15 +332,13 @@
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit <
msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " <<
parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
+ bool enoughCredit = msgCredit > 0 &&
+ (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+ QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient
credit for ")
+ << ConsumerName(*this)
+ << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+ << ", need " << msg->getRequiredCredit() << " bytes");
+ return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Oct 5 15:08:25 2009
@@ -129,6 +129,7 @@
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
+ const SemanticState& getParent() const { return *parent; }
};
private:
@@ -163,6 +164,7 @@
~SemanticState();
SessionContext& getSession() { return session; }
+ const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Oct 5 15:08:25
2009
@@ -28,7 +28,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
#include <boost/noncopyable.hpp>
@@ -45,6 +45,7 @@
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
+ virtual const SessionId& getSessionId() const = 0;
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct 5 15:08:25 2009
@@ -118,6 +118,8 @@
bool processSendCredit(uint32_t msgs);
+ const SessionId& getSessionId() const { return getId(); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const
framing::SequenceNumber& id);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Oct 5 15:08:25 2009
@@ -64,7 +64,8 @@
proxy(ioHandler),
nextIn(0),
nextOut(0),
- sendMsgCredit(0)
+ sendMsgCredit(0),
+ doClearDeliveryPropertiesExchange(true)
{
channel.next = connectionShared.get();
}
@@ -396,11 +397,16 @@
{
AMQFrame header(content.getHeader());
- // Client is not allowed to set the delivery-properties.exchange.
- AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
- if (headerp && headerp->get<DeliveryProperties>())
- headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-
+ // doClearDeliveryPropertiesExchange is set by cluster update client so
+ // it can send messages with delivery-properties.exchange set.
+ //
+ if (doClearDeliveryPropertiesExchange) {
+ // Normal client is not allowed to set the delivery-properties.exchange
+ // so clear it here.
+ AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+ if (headerp && headerp->get<DeliveryProperties>())
+ headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+ }
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Oct 5 15:08:25 2009
@@ -130,6 +130,8 @@
*/
boost::shared_ptr<ConnectionImpl> getConnection();
+ void setDoClearDeliveryPropertiesExchange(bool b=true) {
doClearDeliveryPropertiesExchange = b; }
+
private:
enum State {
INACTIVE,
@@ -243,6 +245,8 @@
// Only keep track of message credit
sys::Semaphore* sendMsgCredit;
+ bool doClearDeliveryPropertiesExchange;
+
friend class client::SessionHandler;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Oct 5 15:08:25
2009
@@ -209,9 +209,16 @@
ClusterConnectionProxy(session).expiryId(*expiryId);
}
+ // We can't send a broker::Message via the normal client API,
+ // and it would be expensive to copy it into a client::Message
+ // so we go a bit under the client API covers here.
+ //
SessionBase_0_10Access sb(session);
+ // Disable client code that clears the delivery-properties.exchange
+ sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE,
message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::ProtocolVersion(), UpdateClient::UPDATE,
message::ACCEPT_MODE_NONE,
+ message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Mon Oct 5 15:08:25
2009
@@ -30,7 +30,7 @@
namespace cluster {
/**
- * A keyless exchange (like fanout exchange) that does not modify
deliver-properties.exchange
+ * A keyless exchange (like fanout exchange) that does not modify
delivery-properties.exchange
* on messages.
*/
class UpdateExchange : public broker::FanOutExchange
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h Mon Oct 5 15:08:25 2009
@@ -75,10 +75,10 @@
/** @param localIndex can be -1 meaning don't automatically start a local
broker.
* A local broker can be started with addLocal().
*/
- ClusterFixture(size_t n, const Args& args, int localIndex=0);
+ ClusterFixture(size_t n, const Args& args, int localIndex=-1);
/*...@param updateArgs function is passed the index of the cluster member
and can update the arguments. */
- ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs,
int localIndex);
+ ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs,
int localIndex=-1);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
Modified: qpid/trunk/qpid/cpp/src/tests/ais_check
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ qpid/trunk/qpid/cpp/src/tests/ais_check Mon Oct 5 15:08:25 2009
@@ -21,29 +21,14 @@
srcdir=`dirname $0`
# Check AIS requirements and run tests if found.
-id -nG | grep '\<ais\>' >/dev/null || \
- NOGROUP="You are not a member of the ais group."
-ps -u root | grep 'aisexec\|corosync' >/dev/null || \
- NOAISEXEC="The aisexec or corosync daemon is not running as root"
-
-if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
- cat <<EOF
-
- =========== WARNING: NOT RUNNING AIS TESTS ==============
-
- Tests that depend on the openais library (used for clustering)
- will not be run because:
- $NOGROUP
- $NOAISEXEC
-
- ==========================================================
-
-EOF
+ps -u root | grep 'aisexec\|corosync' >/dev/null || {
+ echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is
not running.
exit 0; # A warning, not a failure.
-fi
+}
-# Execute command with the ais group set.
+# Execute command with the ais group set if user is a member.
with_ais_group() {
- id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the
ais group."; exit 1; }
- echo $* | newgrp ais
+ if id -nG | grep '\<ais\>' >/dev/null; then sg -c "$*"
+ else "$@"
+ fi
}
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 5 15:08:25 2009
@@ -85,6 +85,12 @@
args += "--no-data-dir";
}
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ return args;
+}
+
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
@@ -596,16 +602,19 @@
}
}
-QPID_AUTO_TEST_CASE(testCatchupSharedState) {
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- // Create some shared state.
+ // Create messages with different delivery properties
c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
c0.session.messageTransfer(arg::content=makeMessage("foo","q",
durableFlag));
- c0.session.messageTransfer(arg::content=makeMessage("bar","q",
durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar","q",
durableFlag),
+ arg::destination="amq.fanout");
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -628,9 +637,12 @@
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Add another broker, don't wait for join - should be stalled till ready.
@@ -1100,6 +1112,44 @@
}
}
-QPID_AUTO_TEST_SUITE_END()
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+ SubscriptionSettings browseSettings(
+ FlowControl(1, n, false), // 1 message, n bytes credit, no window
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ Subscription s = c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(arg::destination=q, arg::sync=true);
+ c.session.sync();
+ c.subs.getSubscription(q).cancel();
+ return lq.get(m, 0); // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+ ClusterFixture cluster(1, prepareArgs(), -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+ cluster.add();
+
+ int size=36; // Size of message on broker: headers+body
+ Client c1(cluster[1], "c1");
+ Message m;
+
+ // Ensure we get the message with exact credit.
+ BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+ BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+ // and not with one byte less.
+ BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+ BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Mon Oct 5
15:08:25 2009
@@ -30,10 +30,6 @@
echo $1
exit 1
}
-with_ais_group() {
- id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the
ais group." 1>&2; exit 1; }
- echo $* | newgrp ais
-}
stop_brokers() {
if [[ $PRIMARY1 ]] ; then
Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Mon Oct 5 15:08:25 2009
@@ -22,11 +22,11 @@
# Check that top_builddir and srcdir are set
# If not, assume local run from test dir
if [ -z ${top_builddir} -o -z ${srcdir} ]; then
- srcdir=`pwd`
+ srcdir=`dirname $0`
top_builddir=${srcdir}/../../
fi
TEST_DIR=${top_builddir}/src/tests
-. `dirname $0`/python_env.sh
+. $srcdir/python_env.sh
if test -z $1; then
CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests
cluster_tests.ShortTests.\*"
@@ -35,32 +35,8 @@
echo "Running $1..."
fi
-
# Check AIS requirements
-id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the
ais group."
-ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or
corosync daemon is not running as root."
-if ! test -d ${PYTHON_DIR}; then
- NO_PYTHON_DIR="PYTHON_DIR=\"${PYTHON_DIR}\" not found or does not
exist."
-fi
-
-if test -n "${NOGROUP}" -o -n "${NOAISEXEC}" -o -n "${NO_PYTHON_DIR}"; then
- cat <<EOF
-
- ======== WARNING: PYTHON CLUSTER TESTS DISABLED ===========
-
- Tests that depend on the openais library (used for clustering)
- and python will not be run because:
-
- ${NOGROUP}
- ${NOAISEXEC}
- ${NO_PYTHON_DIR}
-
- ===========================================================
-
-EOF
- exit 0
-fi
-
+. $srcdir/ais_check
# Check XML exchange requirements
XML_LIB=$srcdir/../.libs/xml.so
@@ -103,7 +79,7 @@
# Run the test
-sg ais -c "${CLUSTER_TEST}"
+with_ais_group ${CLUSTER_TEST}
RETCODE=$?
if test x${RETCODE} != x0; then
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]