Author: astitcher
Date: Thu Sep 13 21:36:27 2012
New Revision: 1384555
URL: http://svn.apache.org/viewvc?rev=1384555&view=rev
Log:
NO-JIRA: Removed now unused cluster specific ClusterSafe code.
Removed:
qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Thu Sep 13 21:36:27 2012
@@ -936,7 +936,6 @@ set (qpidcommon_SOURCES
qpid/management/ManagementObject.cpp
qpid/sys/AggregateOutput.cpp
qpid/sys/AsynchIOHandler.cpp
- qpid/sys/ClusterSafe.cpp
qpid/sys/Dispatcher.cpp
qpid/sys/DispatchHandle.cpp
qpid/sys/Runnable.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu Sep 13 21:36:27 2012
@@ -462,8 +462,6 @@ libqpidcommon_la_SOURCES += \
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
- qpid/sys/ClusterSafe.h \
- qpid/sys/ClusterSafe.cpp \
qpid/sys/Codec.h \
qpid/sys/ConnectionCodec.h \
qpid/sys/ConnectionInputHandler.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Sep 13 21:36:27 2012
@@ -26,7 +26,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -140,7 +139,7 @@ Connection::~Connection()
if (mgmtObject != 0) {
// In a cluster, Connections destroyed during shutdown are in
// a cluster-unsafe context. Don't raise an event in that case.
- if (!link && isClusterSafe())
+ if (!link)
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId,
ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
QPID_LOG_CAT(debug, model, "Delete connection. user:" <<
ConnectionState::getUserId()
<< " rhost:" << mgmtId );
@@ -188,7 +187,7 @@ bool isMessage(const AMQMethodBody* meth
void Connection::recordFromServer(const framing::AMQFrame& frame)
{
// Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats =
mgmtObject->getStatistics();
cStats->framesToClient += 1;
@@ -203,7 +202,7 @@ void Connection::recordFromServer(const
void Connection::recordFromClient(const framing::AMQFrame& frame)
{
// Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats =
mgmtObject->getStatistics();
cStats->framesFromClient += 1;
@@ -358,7 +357,6 @@ void Connection::doIoCallbacks() {
ScopedLock<Mutex> l(ioCallbackLock);
// Although IO callbacks execute in the connection thread context, they are
// not cluster safe because they are queued for execution in non-IO
threads.
- ClusterUnsafeScope cus;
while (!ioCallbacks.empty()) {
boost::function0<void> cb = ioCallbacks.front();
ioCallbacks.pop();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp Thu Sep 13 21:36:27 2012
@@ -20,7 +20,6 @@
*/
#include "Lvq.h"
#include "MessageMap.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
@@ -33,7 +32,6 @@ Lvq::Lvq(const std::string& n, std::auto
void Lvq::push(Message& message, bool isRecovery)
{
- qpid::sys::assertClusterSafe();
QueueListeners::NotificationSet copy;
Message old;
bool removed;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Sep 13 21:36:27 2012
@@ -43,7 +43,6 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qpid/types/Variant.h"
@@ -306,7 +305,6 @@ void Queue::process(Message& msg)
void Queue::release(const QueueCursor& position, bool markRedelivered)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -332,7 +330,6 @@ bool Queue::dequeueMessageAt(const Seque
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
QPID_LOG(debug, "Attempting to dequeue message at " << position);
QueueCursor cursor;
Message* msg = messages->find(position, &cursor);
@@ -352,7 +349,6 @@ bool Queue::dequeueMessageAt(const Seque
bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
Message* msg;
msg = messages->find(position);
@@ -479,7 +475,6 @@ bool Queue::find(SequenceNumber pos, Mes
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
- assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
// NOTE: consumerCount is actually a count of all
@@ -737,7 +732,6 @@ uint32_t Queue::move(const Queue::shared
void Queue::push(Message& message, bool /*isRecovery*/)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Sep 13 21:36:27
2012
@@ -29,7 +29,6 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/SessionState.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Sep 13 21:36:27
2012
@@ -35,7 +35,6 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -345,7 +344,6 @@ bool SemanticState::ConsumerImpl::delive
}
bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const
Message& msg, boost::shared_ptr<Consumer> consumer)
{
- assertClusterSafe();
allocateCredit(msg);
DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
consumer, acquire, !ackExpected,
credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
@@ -376,7 +374,6 @@ bool SemanticState::ConsumerImpl::filter
bool SemanticState::ConsumerImpl::accept(const Message& msg)
{
- assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
@@ -400,7 +397,6 @@ ostream& operator<<(ostream& o, const Co
void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
- assertClusterSafe();
Credit original = credit;
credit.consume(1,
qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
@@ -492,7 +488,6 @@ void SemanticState::requestDispatch()
void SemanticState::ConsumerImpl::requestDispatch()
{
- assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -593,7 +588,6 @@ void SemanticState::stop(const std::stri
void SemanticState::ConsumerImpl::setWindowMode()
{
- assertClusterSafe();
credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
@@ -602,7 +596,6 @@ void SemanticState::ConsumerImpl::setWin
void SemanticState::ConsumerImpl::setCreditMode()
{
- assertClusterSafe();
credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
@@ -611,13 +604,11 @@ void SemanticState::ConsumerImpl::setCre
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- assertClusterSafe();
credit.addByteCredit(value);
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- assertClusterSafe();
credit.addMessageCredit(value);
}
@@ -645,7 +636,6 @@ void SemanticState::ConsumerImpl::flush(
void SemanticState::ConsumerImpl::stop()
{
- assertClusterSafe();
credit.cancel();
}
@@ -711,7 +701,6 @@ bool SemanticState::ConsumerImpl::doOutp
void SemanticState::ConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
notifyEnabled = true;
}
@@ -729,7 +718,6 @@ bool SemanticState::ConsumerImpl::isNoti
void SemanticState::ConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -754,7 +742,6 @@ isInSequenceSetAnd(const SequenceSet& s,
}
void SemanticState::accepted(const SequenceSet& commands) {
- assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1384555&r1=1384554&r2=1384555&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Sep 13 21:36:27
2012
@@ -25,7 +25,6 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -251,11 +250,6 @@ void SessionState::completeRcvMsg(Sequen
bool requiresAccept,
bool requiresSync)
{
- // Mark this as a cluster-unsafe scope since it can be called in
- // journal threads or connection threads as part of asynchronous
- // command completion.
- sys::ClusterUnsafeScope cus;
-
bool callSendCompletion = false;
receiverCompleted(id);
if (requiresAccept)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]