Author: aconway
Date: Fri Apr 29 15:21:20 2011
New Revision: 1097838
URL: http://svn.apache.org/viewvc?rev=1097838&view=rev
Log:
QPID-3235: clustered qpidd broker fails ocassionly the
cluster_tests.ShortTests.test_route_update
Inconsistent stats changes on a Link were causing cluster
inconsistency. Fix is to disable those stats changes in a cluster.
Updated cluster_tests.py to reliably generate the error every time
without the fix.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Apr 29 15:21:20 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,6 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -57,8 +56,8 @@ Link::Link(LinkRegistry* _links,
string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store), host(_host), port(_port),
+ transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -97,7 +96,8 @@ void Link::setStateLH (int newState)
return;
state = newState;
- if (mgmtObject == 0)
+
+ if (hideManagement())
return;
switch (state)
@@ -122,7 +122,7 @@ void Link::startConnectionLH ()
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" <<
port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError (e.what());
}
}
@@ -133,8 +133,7 @@ void Link::established ()
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- // Don't raise the management event in a cluster, other members wont't get
this call.
- if (broker && !broker->isInCluster())
+ if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
{
@@ -154,12 +153,11 @@ void Link::closed (int, std::string text
connection = 0;
- // Don't raise the management event in a cluster, other members wont't get
this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " <<
addr.str());
- if (broker && !broker->isInCluster())
+ if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -172,7 +170,7 @@ void Link::closed (int, std::string text
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError (text);
}
@@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bri
{
{
Mutex::ScopedLock mutex(lock);
-
+
for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
if ((*i).get() == bridge.get()) {
created.erase(i);
@@ -277,9 +275,9 @@ void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (connection && updateUrls) {
+ if (connection && updateUrls) {
urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " <<
urls);
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
updateUrls = false;
}
@@ -309,7 +307,7 @@ void Link::reconnect(const qpid::Address
port = a.port;
transport = a.protocol;
startConnectionLH();
- if (mgmtObject != 0) {
+ if (!hideManagement()) {
stringstream errorString;
errorString << "Failed over to " << a;
mgmtObject->set_lastError(errorString.str());
@@ -319,7 +317,7 @@ void Link::reconnect(const qpid::Address
bool Link::tryFailover()
{
Address next;
- if (urls.next(next) &&
+ if (urls.next(next) &&
(next.host != host || next.port != port || next.protocol !=
transport)) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -329,6 +327,12 @@ bool Link::tryFailover()
}
}
+// Management updates for a linke are inconsistent in a cluster, so they are
+// suppressed.
+bool Link::hideManagement() const {
+ return !mgmtObject || ( broker && broker->isInCluster());
+}
+
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
@@ -341,7 +345,7 @@ void Link::notifyConnectionForced(const
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError(text);
}
@@ -363,7 +367,7 @@ Link::shared_ptr Link::decode(LinkRegist
string authMechanism;
string username;
string password;
-
+
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -375,7 +379,7 @@ Link::shared_ptr Link::decode(LinkRegist
return links.declare(host, port, transport, durable, authMechanism,
username, password).first;
}
-void Link::encode(Buffer& buffer) const
+void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
buffer.putShortString(host);
@@ -387,8 +391,8 @@ void Link::encode(Buffer& buffer) const
buffer.putShortString(password);
}
-uint32_t Link::encodedSize() const
-{
+uint32_t Link::encodedSize() const
+{
return host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Fri Apr 29 15:21:20 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -85,6 +85,7 @@ namespace qpid {
void destroy(); // Called when mgmt deletes this
link
void ioThreadProcessing(); // Called on connection's IO
thread by request
bool tryFailover(); // Called during maintenance visit
+ bool hideManagement() const;
public:
typedef boost::shared_ptr<Link> shared_ptr;
@@ -122,12 +123,12 @@ namespace qpid {
void notifyConnectionForced(const std::string text);
void setPassive(bool p);
-
+
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
+ void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
static Link::shared_ptr decode(LinkRegistry& links,
framing::Buffer& buffer);
@@ -135,6 +136,7 @@ namespace qpid {
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t,
management::Args&, std::string&);
+
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Apr 29 15:21:20
2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -381,7 +381,7 @@ std::string LinkRegistry::createKey(cons
return keystream.str();
}
-void LinkRegistry::setPassive(bool p)
+void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
passiveChanged = p != passive;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1097838&r1=1097837&r2=1097838&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Apr 29 15:21:20 2011
@@ -282,6 +282,13 @@ acl allow all all
qpid_tool.wait()
scanner.join()
assert scanner.found
+ # Regression test for https://issues.apache.org/jira/browse/QPID-3235
+ # Inconsistent stats when changing elder.
+
+ # Force a change of elder
+ cluster0.start()
+ cluster0[0].kill()
+ time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
cluster_test_logs.verify_logs()
@@ -602,7 +609,6 @@ acl allow all all
send0.send("bar") # Should fail, exchange is deleted.
self.fail("Expected not-found exception")
except qpid.messaging.NotFound: pass
- # FIXME aconway 2011-04-19: s0 is broken, new session
self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
def test_deleted_exchange_inconsistent(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]