Author: aconway
Date: Mon Oct  5 15:08:25 2009
New Revision: 821830

URL: http://svn.apache.org/viewvc?rev=821830&view=rev
Log:
Fixed: cluster udpate did not presever deliver-properties.exchange on messages.

Also minor improvements:
 - Improved debug logging for consumers.
 - Cluster tests scripts work with latest corosync: don't check/set ais group.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
    qpid/trunk/qpid/cpp/src/tests/ais_check
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct  5 15:08:25 
2009
@@ -302,6 +302,18 @@
     return !blocked;
 }
 
+namespace {
+struct ConsumerName {
+    const SemanticState::ConsumerImpl& consumer;
+    ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+    return o << pc.consumer.getName() << " on "
+             << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
 void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
 {
     uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@
     if (byteCredit != 0xFFFFFFFF) {
         byteCredit -= msg->getRequiredCredit();
     }
-    QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+    QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << " bytes: " << originalByteCredit << " msgs: " << 
originalMsgCredit
              << " now bytes: " << byteCredit << " msgs: " << msgCredit);
     
@@ -320,15 +332,13 @@
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
 {
-    if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < 
msg->getRequiredCredit())) {
-        QPID_LOG(debug, "Not enough credit for '" << name  << "' on " << 
parent 
-                 << ", bytes: " << byteCredit << " msgs: " << msgCredit);
-        return false;
-    } else {
-        QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
-                 << " bytes: " << byteCredit << " msgs: " << msgCredit);
-        return true;
-    }
+    bool enoughCredit = msgCredit > 0 &&
+        (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+    QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient 
credit for ")
+             << ConsumerName(*this)
+             << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+             << ", need " << msg->getRequiredCredit() << " bytes");
+    return enoughCredit;
 }
 
 SemanticState::ConsumerImpl::~ConsumerImpl() {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Oct  5 15:08:25 2009
@@ -129,6 +129,7 @@
         const framing::FieldTable& getArguments() const { return arguments; }
 
         SemanticState& getParent() { return *parent; }
+        const SemanticState& getParent() const { return *parent; }
     };
 
   private:
@@ -163,6 +164,7 @@
     ~SemanticState();
 
     SessionContext& getSession() { return session; }
+    const SessionContext& getSession() const { return session; }
 
     ConsumerImpl& find(const std::string& destination);
     

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Oct  5 15:08:25 
2009
@@ -28,7 +28,7 @@
 #include "qpid/sys/OutputControl.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -45,6 +45,7 @@
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
+    virtual const SessionId& getSessionId() const = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct  5 15:08:25 2009
@@ -118,6 +118,8 @@
 
     bool processSendCredit(uint32_t msgs);
 
+    const SessionId& getSessionId() const { return getId(); }
+
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const 
framing::SequenceNumber& id);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Oct  5 15:08:25 2009
@@ -64,7 +64,8 @@
       proxy(ioHandler),
       nextIn(0),
       nextOut(0),
-      sendMsgCredit(0)
+      sendMsgCredit(0),
+      doClearDeliveryPropertiesExchange(true)
 {
     channel.next = connectionShared.get();
 }
@@ -396,11 +397,16 @@
 {
     AMQFrame header(content.getHeader());
 
-    // Client is not allowed to set the delivery-properties.exchange.
-    AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
-    if (headerp && headerp->get<DeliveryProperties>())
-        headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-
+    // doClearDeliveryPropertiesExchange is set by cluster update client so
+    // it can send messages with delivery-properties.exchange set.
+    //
+    if (doClearDeliveryPropertiesExchange) {
+        // Normal client is not allowed to set the delivery-properties.exchange
+        // so clear it here.
+        AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+        if (headerp && headerp->get<DeliveryProperties>())
+            headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+    }
     header.setFirstSegment(false);
     uint64_t data_length = content.getData().length();
     if(data_length > 0){

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Oct  5 15:08:25 2009
@@ -130,6 +130,8 @@
      */
     boost::shared_ptr<ConnectionImpl> getConnection();
 
+    void setDoClearDeliveryPropertiesExchange(bool b=true) { 
doClearDeliveryPropertiesExchange = b; }
+
 private:
     enum State {
         INACTIVE,
@@ -243,6 +245,8 @@
     // Only keep track of message credit 
     sys::Semaphore* sendMsgCredit;
 
+    bool doClearDeliveryPropertiesExchange;
+
   friend class client::SessionHandler;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Oct  5 15:08:25 
2009
@@ -209,9 +209,16 @@
             ClusterConnectionProxy(session).expiryId(*expiryId);
         }
 
+        // We can't send a broker::Message via the normal client API,
+        // and it would be expensive to copy it into a client::Message
+        // so we go a bit under the client API covers here.
+        //
         SessionBase_0_10Access sb(session);
+        // Disable client code that clears the delivery-properties.exchange
+        sb.get()->setDoClearDeliveryPropertiesExchange(false);
         framing::MessageTransferBody transfer(
-            framing::ProtocolVersion(), UpdateClient::UPDATE, 
message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+            framing::ProtocolVersion(), UpdateClient::UPDATE, 
message::ACCEPT_MODE_NONE,
+            message::ACQUIRE_MODE_PRE_ACQUIRED);
         
         sb.get()->send(transfer, message.payload->getFrames(), 
!message.payload->isContentReleased());
         if (message.payload->isContentReleased()){

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Mon Oct  5 15:08:25 
2009
@@ -30,7 +30,7 @@
 namespace cluster {
 
 /**
- * A keyless exchange (like fanout exchange) that does not modify 
deliver-properties.exchange
+ * A keyless exchange (like fanout exchange) that does not modify 
delivery-properties.exchange
  * on messages.
  */
 class UpdateExchange : public broker::FanOutExchange

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h Mon Oct  5 15:08:25 2009
@@ -75,10 +75,10 @@
     /** @param localIndex can be -1 meaning don't automatically start a local 
broker.
      * A local broker can be started with addLocal().
      */
-    ClusterFixture(size_t n, const Args& args, int localIndex=0);
+    ClusterFixture(size_t n, const Args& args, int localIndex=-1);
 
     /*...@param updateArgs function is passed the index of the cluster member 
and can update the arguments. */
-    ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, 
int localIndex);
+    ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, 
int localIndex=-1);
 
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();                 // Add a broker.

Modified: qpid/trunk/qpid/cpp/src/tests/ais_check
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ qpid/trunk/qpid/cpp/src/tests/ais_check Mon Oct  5 15:08:25 2009
@@ -21,29 +21,14 @@
 srcdir=`dirname $0`
 
 # Check AIS requirements and run tests if found.
-id -nG | grep '\<ais\>' >/dev/null || \
-    NOGROUP="You are not a member of the ais group."
-ps -u root | grep 'aisexec\|corosync' >/dev/null || \
-    NOAISEXEC="The aisexec or corosync daemon is not running as root"
-
-if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
-    cat <<EOF
-
-    =========== WARNING: NOT RUNNING AIS TESTS ==============
-
-    Tests that depend on the openais library (used for clustering)
-    will not be run because:
-    $NOGROUP
-    $NOAISEXEC
-
-    ==========================================================
-    
-EOF
+ps -u root | grep 'aisexec\|corosync' >/dev/null || {
+    echo WARNING: Skipping cluster tests, the aisexec or corosync daemon is 
not running.
     exit 0;                    # A warning, not a failure.
-fi
+}
 
-# Execute command with the ais group set.
+# Execute command with the ais group set if user is a member.
 with_ais_group() {
-    id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the 
ais group."; exit 1; }
-    echo $* | newgrp ais
+    if id -nG | grep '\<ais\>' >/dev/null; then sg -c "$*"
+    else "$@"
+    fi
 }

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct  5 15:08:25 2009
@@ -85,6 +85,12 @@
         args += "--no-data-dir";
 }
 
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    return args;
+}
+
 // Timeout for tests that wait for messages
 const sys::Duration TIMEOUT=sys::TIME_SEC/4;
 
@@ -596,16 +602,19 @@
     }
 }
 
-QPID_AUTO_TEST_CASE(testCatchupSharedState) {
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
     ClusterFixture::Args args;
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
 
-    // Create some shared state.
+    // Create messages with different delivery properties
     c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
     c0.session.messageTransfer(arg::content=makeMessage("foo","q", 
durableFlag));
-    c0.session.messageTransfer(arg::content=makeMessage("bar","q", 
durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("bar","q", 
durableFlag),
+                               arg::destination="amq.fanout");
 
     while (c0.session.queueQuery("q").getMessageCount() != 2)
         sys::usleep(1000);    // Wait for message to show up on broker 0.
@@ -628,9 +637,12 @@
 
     BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK(m.getDeliveryProperties().hasExchange());
     BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
     BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "bar");
+    BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+    BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
 
     // Add another broker, don't wait for join - should be stalled till ready.
@@ -1100,6 +1112,44 @@
     }
 }
 
-QPID_AUTO_TEST_SUITE_END()
 
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+    SubscriptionSettings browseSettings(
+        FlowControl(1, n, false), // 1 message, n bytes credit, no window
+        ACCEPT_MODE_NONE,
+        ACQUIRE_MODE_NOT_ACQUIRED,
+        0                       // No auto-ack.
+    );
+    LocalQueue lq;
+    Subscription s = c.subs.subscribe(lq, q, browseSettings);
+    c.session.messageFlush(arg::destination=q, arg::sync=true);
+    c.session.sync();
+    c.subs.getSubscription(q).cancel();
+    return lq.get(m, 0);        // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+    ClusterFixture cluster(1, prepareArgs(), -1);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+    cluster.add();
+
+    int size=36;                // Size of message on broker: headers+body
+    Client c1(cluster[1], "c1");
+    Message m;
+
+    // Ensure we get the message with exact credit.
+    BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+    BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+    // and not with one byte less.
+    BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+    BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Mon Oct  5 
15:08:25 2009
@@ -30,10 +30,6 @@
     echo $1
     exit 1
 }
-with_ais_group() {
-    id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the 
ais group." 1>&2; exit 1; }
-    echo $* | newgrp ais
-}
 
 stop_brokers() {
     if [[ $PRIMARY1 ]] ; then

Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=821830&r1=821829&r2=821830&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Mon Oct  5 15:08:25 2009
@@ -22,11 +22,11 @@
 # Check that top_builddir and srcdir are set
 # If not, assume local run from test dir
 if [ -z ${top_builddir} -o -z ${srcdir} ]; then
-       srcdir=`pwd`
+       srcdir=`dirname $0`
        top_builddir=${srcdir}/../../
 fi
 TEST_DIR=${top_builddir}/src/tests
-. `dirname $0`/python_env.sh
+. $srcdir/python_env.sh
 
 if test -z $1; then
        CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests 
cluster_tests.ShortTests.\*"
@@ -35,32 +35,8 @@
        echo "Running $1..."
 fi
 
-
 # Check AIS requirements
-id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the 
ais group."
-ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or 
corosync daemon is not running as root."
-if ! test -d ${PYTHON_DIR}; then
-       NO_PYTHON_DIR="PYTHON_DIR=\"${PYTHON_DIR}\" not found or does not 
exist."
-fi
-
-if test -n "${NOGROUP}" -o -n "${NOAISEXEC}" -o -n "${NO_PYTHON_DIR}"; then
-    cat <<EOF
-
-    ======== WARNING: PYTHON CLUSTER TESTS DISABLED ===========
-
-    Tests that depend on the openais library (used for clustering)
-    and python will not be run because:
-
-    ${NOGROUP}
-    ${NOAISEXEC}
-    ${NO_PYTHON_DIR}
-
-    ===========================================================
-
-EOF
-       exit 0
-fi
-
+. $srcdir/ais_check
 
 # Check XML exchange requirements
 XML_LIB=$srcdir/../.libs/xml.so
@@ -103,7 +79,7 @@
 
 
 # Run the test
-sg ais -c "${CLUSTER_TEST}"
+with_ais_group ${CLUSTER_TEST}
 RETCODE=$?
 
 if test x${RETCODE} != x0; then 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to