Author: aconway
Date: Fri Apr 29 15:21:20 2011
New Revision: 1097838

URL: http://svn.apache.org/viewvc?rev=1097838&view=rev
Log:
QPID-3235: clustered qpidd broker fails ocassionly the 
cluster_tests.ShortTests.test_route_update

Inconsistent stats changes on a Link were causing cluster
inconsistency. Fix is to disable those stats changes in a cluster.

Updated cluster_tests.py to reliably generate the error every time
without the fix.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Apr 29 15:21:20 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,6 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
-#include "qpid/sys/ClusterSafe.h"
 
 using namespace qpid::broker;
 using qpid::framing::Buffer;
@@ -57,8 +56,8 @@ Link::Link(LinkRegistry*  _links,
            string&        _password,
            Broker*        _broker,
            Manageable*    parent)
-    : links(_links), store(_store), host(_host), port(_port), 
-      transport(_transport), 
+    : links(_links), store(_store), host(_host), port(_port),
+      transport(_transport),
       durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -97,7 +96,8 @@ void Link::setStateLH (int newState)
         return;
 
     state = newState;
-    if (mgmtObject == 0)
+
+    if (hideManagement())
         return;
 
     switch (state)
@@ -122,7 +122,7 @@ void Link::startConnectionLH ()
         QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << 
port);
     } catch(std::exception& e) {
         setStateLH(STATE_WAITING);
-        if (mgmtObject != 0)
+        if (!hideManagement())
             mgmtObject->set_lastError (e.what());
     }
 }
@@ -133,8 +133,7 @@ void Link::established ()
     addr << host << ":" << port;
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
-    // Don't raise the management event in a cluster, other members wont't get 
this call.
-    if (broker && !broker->isInCluster())
+    if (!hideManagement() && agent)
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
 
     {
@@ -154,12 +153,11 @@ void Link::closed (int, std::string text
 
     connection = 0;
 
-    // Don't raise the management event in a cluster, other members wont't get 
this call.
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << 
addr.str());
-        if (broker && !broker->isInCluster())
+        if (!hideManagement() && agent)
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 
@@ -172,7 +170,7 @@ void Link::closed (int, std::string text
     if (state != STATE_FAILED)
     {
         setStateLH(STATE_WAITING);
-        if (mgmtObject != 0)
+        if (!hideManagement())
             mgmtObject->set_lastError (text);
     }
 
@@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bri
 {
     {
         Mutex::ScopedLock mutex(lock);
-        
+
         for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
             if ((*i).get() == bridge.get()) {
                 created.erase(i);
@@ -277,9 +275,9 @@ void Link::maintenanceVisit ()
 {
     Mutex::ScopedLock mutex(lock);
 
-    if (connection && updateUrls) { 
+    if (connection && updateUrls) {
         urls.reset(connection->getKnownHosts());
-        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << 
urls);        
+        QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
         updateUrls = false;
     }
 
@@ -309,7 +307,7 @@ void Link::reconnect(const qpid::Address
     port = a.port;
     transport = a.protocol;
     startConnectionLH();
-    if (mgmtObject != 0) {
+    if (!hideManagement()) {
         stringstream errorString;
         errorString << "Failed over to " << a;
         mgmtObject->set_lastError(errorString.str());
@@ -319,7 +317,7 @@ void Link::reconnect(const qpid::Address
 bool Link::tryFailover()
 {
     Address next;
-    if (urls.next(next) && 
+    if (urls.next(next) &&
         (next.host != host || next.port != port || next.protocol != 
transport)) {
         links->changeAddress(Address(transport, host, port), next);
         QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -329,6 +327,12 @@ bool Link::tryFailover()
     }
 }
 
+// Management updates for a linke are inconsistent in a cluster, so they are
+// suppressed.
+bool Link::hideManagement() const {
+    return !mgmtObject || ( broker && broker->isInCluster());
+}
+
 uint Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
@@ -341,7 +345,7 @@ void Link::notifyConnectionForced(const 
     Mutex::ScopedLock mutex(lock);
 
     setStateLH(STATE_FAILED);
-    if (mgmtObject != 0)
+    if (!hideManagement())
         mgmtObject->set_lastError(text);
 }
 
@@ -363,7 +367,7 @@ Link::shared_ptr Link::decode(LinkRegist
     string   authMechanism;
     string   username;
     string   password;
-    
+
     buffer.getShortString(host);
     port = buffer.getShort();
     buffer.getShortString(transport);
@@ -375,7 +379,7 @@ Link::shared_ptr Link::decode(LinkRegist
     return links.declare(host, port, transport, durable, authMechanism, 
username, password).first;
 }
 
-void Link::encode(Buffer& buffer) const 
+void Link::encode(Buffer& buffer) const
 {
     buffer.putShortString(string("link"));
     buffer.putShortString(host);
@@ -387,8 +391,8 @@ void Link::encode(Buffer& buffer) const 
     buffer.putShortString(password);
 }
 
-uint32_t Link::encodedSize() const 
-{ 
+uint32_t Link::encodedSize() const
+{
     return host.size() + 1 // short-string (host)
         + 5                // short-string ("link")
         + 2                // port

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Fri Apr 29 15:21:20 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -85,6 +85,7 @@ namespace qpid {
             void destroy();                  // Called when mgmt deletes this 
link
             void ioThreadProcessing();       // Called on connection's IO 
thread by request
             bool tryFailover();              // Called during maintenance visit
+            bool hideManagement() const;
 
         public:
             typedef boost::shared_ptr<Link> shared_ptr;
@@ -122,12 +123,12 @@ namespace qpid {
 
             void notifyConnectionForced(const std::string text);
             void setPassive(bool p);
-            
+
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;
             uint64_t getPersistenceId() const { return persistenceId; }
             uint32_t encodedSize() const;
-            void     encode(framing::Buffer& buffer) const; 
+            void     encode(framing::Buffer& buffer) const;
             const std::string& getName() const;
 
             static Link::shared_ptr decode(LinkRegistry& links, 
framing::Buffer& buffer);
@@ -135,6 +136,7 @@ namespace qpid {
             // Manageable entry points
             management::ManagementObject*    GetManagementObject(void) const;
             management::Manageable::status_t ManagementMethod(uint32_t, 
management::Args&, std::string&);
+
         };
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Apr 29 15:21:20 
2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -381,7 +381,7 @@ std::string LinkRegistry::createKey(cons
     return keystream.str();
 }
 
-void LinkRegistry::setPassive(bool p) 
+void LinkRegistry::setPassive(bool p)
 {
     Mutex::ScopedLock locker(lock);
     passiveChanged = p != passive;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Apr 29 15:21:20 2011
@@ -282,6 +282,13 @@ acl allow all all
             qpid_tool.wait()
             scanner.join()
         assert scanner.found
+        # Regression test for https://issues.apache.org/jira/browse/QPID-3235
+        # Inconsistent stats when changing elder.
+
+        # Force a change of elder
+        cluster0.start()
+        cluster0[0].kill()
+        time.sleep(2) # Allow a management interval to pass.
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
@@ -602,7 +609,6 @@ acl allow all all
             send0.send("bar")     # Should fail, exchange is deleted.
             self.fail("Expected not-found exception")
         except qpid.messaging.NotFound: pass
-        # FIXME aconway 2011-04-19: s0 is broken, new session
         self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
 
     def test_deleted_exchange_inconsistent(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to