Author: kgiusti
Date: Mon Jan 24 20:45:55 2011
New Revision: 1062965
URL: http://svn.apache.org/viewvc?rev=1062965&view=rev
Log:
QPID-2921: (partial) Async completion of message.transfer and execution.sync
commands
Added:
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
Modified:
qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp
Modified: qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am Mon Jan 24 20:45:55 2011
@@ -548,8 +548,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HandlerImpl.h \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
- qpid/broker/IncompleteMessageList.cpp \
- qpid/broker/IncompleteMessageList.h \
+ qpid/broker/AsyncCompletion.h \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
qpid/broker/LinkRegistry.cpp \
Added: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1062965&view=auto
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h (added)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h Mon Jan
24 20:45:55 2011
@@ -0,0 +1,171 @@
+#ifndef _Completion_
+#define _Completion_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * Class to implement asynchronous notification of completion.
+ *
+ * Use-case: An "initiator" needs to wait for a set of "completers" to
+ * finish a unit of work before an action can occur. This object
+ * tracks the progress of the set of completers, and allows the action
+ * to occur once all completers have signalled that they are done.
+ *
+ * The initiator and completers may be running in separate threads.
+ *
+ * The initiating thread is the thread that initiates the action,
+ * i.e. the connection read thread.
+ *
+ * A completing thread is any thread that contributes to completion,
+ * e.g. a store thread that does an async write.
+ * There may be zero or more completers.
+ *
+ * When the work is complete, a callback is invoked. The callback
+ * may be invoked in the Initiator thread, or one of the Completer
+ * threads. The callback is passed a flag indicating whether or not
+ * the callback is running under the context of the Initiator thread.
+ *
+ * Use model:
+ * 1) Initiator thread invokes begin()
+ * 2) After begin() has been invoked, zero or more Completers invoke
+ * startCompleter(). Completers may be running in the same or
+ * different thread as the Initiator, as long as they guarantee that
+ * startCompleter() is invoked at least once before the Initiator
invokes end().
+ * 3) Completers may invoke finishCompleter() at any time, even after
the
+ * initiator has invoked end(). finishCompleter() may be called from
any
+ * thread.
+ * 4) startCompleter()/finishCompleter() calls "nest": for each call to
+ * startCompleter(), a corresponding call to finishCompleter() must be
made.
+ * Once the last finishCompleter() is called, the Completer must no
longer
+ * reference the completion object.
+ * 5) The Initiator invokes end() at the point where it has finished
+ * dispatching work to the Completers, and is prepared for the callback
+ * handler to be invoked. Note: if there are no outstanding Completers
+ * pending when the Initiator invokes end(), the callback will be
invoked
+ * directly, and the sync parameter will be set true. This indicates
to the
+ * Initiator that the callback is executing in the context of the
end() call,
+ * and the Initiator is free to optimize the handling of the
completion,
+ * assuming no need for synchronization with Completer threads.
+ */
+ class AsyncCompletion {
+ public:
+ // encapsulates the completion callback handler
+ class CompletionHandler {
+ public:
+ virtual void operator() (bool) { /* bool == true if called via
end() */}
+ };
+
+ private:
+ mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
+ mutable qpid::sys::Monitor callbackLock;
+ bool inCallback;
+ void invokeCallback(bool sync) {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ inCallback = true;
+ if (handler) {
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ (*handler)(sync);
+ handler.reset();
+ }
+ inCallback = false;
+ callbackLock.notifyAll();
+ }
+
+ protected:
+ /** Invoked when all completers have signalled that they have
completed
+ * (via calls to finishCompleter()).
+ */
+ boost::shared_ptr<CompletionHandler> handler;
+
+ public:
+ AsyncCompletion() : completionsNeeded(0), inCallback(false) {};
+ virtual ~AsyncCompletion() { /* @todo KAG -
assert(completionsNeeded.get() == 0); */ };
+
+ /** True when all outstanding operations have compeleted
+ */
+ bool isDone()
+ {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ return !inCallback && completionsNeeded.get() == 0;
+ }
+
+ /** Called to signal the start of an asynchronous operation. The
operation
+ * is considered pending until finishCompleter() is called.
+ * E.g. called when initiating an async store operation.
+ */
+ void startCompleter() { ++completionsNeeded; }
+
+ /** Called by completer to signal that it has finished the
operation started
+ * when startCompleter() was invoked.
+ * e.g. called when async write complete.
+ */
+ void finishCompleter()
+ {
+ if (--completionsNeeded == 0) {
+ invokeCallback(false);
+ }
+ }
+
+ /** called by initiator before any calls to startCompleter can be
done.
+ */
+ void begin() { startCompleter(); };
+
+ /** called by initiator after all potential completers have called
+ * startCompleter().
+ */
+ //void end(CompletionHandler::shared_ptr& _handler)
+ void end(boost::shared_ptr<CompletionHandler> _handler)
+ {
+ assert(completionsNeeded.get() > 0); // ensure begin() has
been called!
+ handler = _handler;
+ if (--completionsNeeded == 0) {
+ invokeCallback(true);
+ }
+ }
+
+ /** may be called by Initiator to cancel the callback registered
by end()
+ */
+ void cancel() {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
+ handler.reset();
+ }
+
+ /** may be called by Initiator after all completers have been
added but
+ * prior to calling end(). Allows initiator to determine if it
_really_
+ * needs to wait for pending Completers (e.g. count > 1).
+ */
+ uint32_t getPendingCompleters() { return completionsNeeded.get(); }
+ };
+
+ }} // qpid::broker::
+#endif /*!_Completion_*/
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp Mon Jan 24
20:45:55 2011
@@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0) {}
+ expiration(FAR_FUTURE), dequeueCallback(0),
+ inCallback(false), requiredCredit(0)
+{}
Message::Message(const Message& original) :
PersistableMessage(), frames(original.frames), persistenceId(0),
redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
+ expiration(original.expiration), dequeueCallback(0),
+ inCallback(false), requiredCredit(0)
{
setExpiryPolicy(original.expiryPolicy);
}
@@ -431,30 +432,12 @@ struct ScopedSet {
};
}
-void Message::allEnqueuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = enqueueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
void Message::allDequeuesComplete() {
ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = 0;
-}
-
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h Mon Jan 24
20:45:55 2011
@@ -156,10 +156,6 @@ public:
boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor)
const;
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue*
qfor);
- /** Call cb when enqueue is complete, may call immediately. Holds cb by
reference. */
- void setEnqueueCompleteCallback(MessageCallback& cb);
- void resetEnqueueCompleteCallback();
-
/** Call cb when dequeue is complete, may call immediately. Holds cb by
reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
@@ -170,7 +166,6 @@ public:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
MessageAdapter& getAdapter() const;
- void allEnqueuesComplete();
void allDequeuesComplete();
mutable sys::Mutex lock;
@@ -192,7 +187,6 @@ public:
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
- MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
bool inCallback;
Modified:
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
(original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon
Jan 24 20:45:55 2011
@@ -34,7 +34,6 @@ class MessageStore;
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
- asyncEnqueueCounter(0),
asyncDequeueCounter(0),
store(0)
{}
@@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleas
return contentReleaseState.released;
}
-bool PersistableMessage::isEnqueueComplete() {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- return asyncEnqueueCounter == 0;
-}
-
-void PersistableMessage::enqueueComplete() {
- bool notify = false;
- {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- if (asyncEnqueueCounter > 0) {
- if (--asyncEnqueueCounter == 0) {
- notify = true;
- }
- }
- }
- if (notify)
- allEnqueuesComplete();
-}
bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
if (store && (queue->getPersistenceId()!=0)) {
@@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(P
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store) {
addToSyncList(queue, _store);
- enqueueAsync();
-}
-
-void PersistableMessage::enqueueAsync() {
- sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- asyncEnqueueCounter++;
+ enqueueStart();
}
bool PersistableMessage::isDequeueComplete() {
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
(original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon
Jan 24 20:45:55 2011
@@ -31,6 +31,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/AsyncCompletion.h"
namespace qpid {
namespace broker {
@@ -43,18 +44,18 @@ class MessageStore;
class PersistableMessage : public Persistable
{
typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Mutex asyncEnqueueLock;
sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
-
+
/**
- * Tracks the number of outstanding asynchronous enqueue
- * operations. When the message is enqueued asynchronously the
- * count is incremented; when that enqueue completes it is
- * decremented. Thus when it is 0, there are no outstanding
- * enqueues.
+ * Tracks the number of outstanding asynchronous operations that must
+ * complete before the message can be considered safely received by the
+ * broker. E.g. all enqueues have completed, the message has been written
+ * to store, credit has been replenished, etc. Once all outstanding
+ * operations have completed, the transfer of this message from the client
+ * may be considered complete.
*/
- int asyncEnqueueCounter;
+ AsyncCompletion receiveCompletion;
/**
* Tracks the number of outstanding asynchronous dequeue
@@ -65,7 +66,6 @@ class PersistableMessage : public Persis
*/
int asyncDequeueCounter;
- void enqueueAsync();
void dequeueAsync();
syncList synclist;
@@ -80,8 +80,6 @@ class PersistableMessage : public Persis
ContentReleaseState contentReleaseState;
protected:
- /** Called when all enqueues are complete for this message. */
- virtual void allEnqueuesComplete() = 0;
/** Called when all dequeues are complete for this message. */
virtual void allDequeuesComplete() = 0;
@@ -115,9 +113,9 @@ class PersistableMessage : public Persis
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
- QPID_BROKER_EXTERN bool isEnqueueComplete();
-
- QPID_BROKER_EXTERN void enqueueComplete();
+ QPID_BROKER_EXTERN bool isReceiveComplete() { return
receiveCompletion.isDone(); }
+ QPID_BROKER_EXTERN void enqueueStart() {
receiveCompletion.startCompleter(); }
+ QPID_BROKER_EXTERN void enqueueComplete() {
receiveCompletion.finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
@@ -133,7 +131,8 @@ class PersistableMessage : public Persis
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore*
_store);
-
+
+ QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return
receiveCompletion; }
};
}}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jan 24
20:45:55 2011
@@ -157,13 +157,8 @@ void Queue::deliver(boost::intrusive_ptr
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
- // if no store then mark as enqueued
- if (!enqueue(0, msg)){
- push(msg);
- msg->enqueueComplete();
- }else {
- push(msg);
- }
+ enqueue(0, msg);
+ push(msg);
mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
@@ -688,7 +683,7 @@ uint32_t Queue::getEnqueueCompleteMessag
//NOTE: don't need to use checkLvqReplace() here as it
//is only relevant for LVQ which does not support persistence
//so the enqueueComplete check has no effect
- if ( i->payload->isEnqueueComplete() ) count ++;
+ if ( i->payload->isReceiveComplete() ) count ++;
}
return count;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
(original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Jan
24 20:45:55 2011
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/SessionState.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -586,7 +587,12 @@ framing::MessageResumeResult SessionAdap
-void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+ session.addPendingExecutionSync();
+ /** @todo KAG - need a generic mechanism to allow a command to returning
"not completed" status back to SessionState */
+
+}
void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber&
/*commandId*/, const string& /*value*/)
{
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionContext.h Mon Jan
24 20:45:55 2011
@@ -46,6 +46,7 @@ class SessionContext : public OwnershipT
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
+ virtual void addPendingExecutionSync() = 0;
};
}} // namespace qpid::broker
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Jan
24 20:45:55 2011
@@ -59,7 +59,6 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore()),
- enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
rateFlowcontrol(0)
{
@@ -94,6 +93,18 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
+
+ // clean up any outstanding incomplete receive messages
+
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ while (!incompleteRcvMsgs.empty()) {
+ boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
+ incompleteRcvMsgs.pop_front();
+ {
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+ ref->cancel();
+ }
+ }
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -195,15 +206,17 @@ Manageable::status_t SessionState::Manag
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const
SequenceNumber& id) {
+ currentCommandComplete = true; // assumed, can be overridden by
invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ if (currentCommandComplete) receiverCompleted(id);
+
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " <<
*method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
- incomplete.process(enqueuedOp, true);
+
+ if (method->isSync() && currentCommandComplete) {
sendAcceptAndCompletion();
}
}
@@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFram
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
+
+ msg->getReceiveCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
-
- if (msg->isEnqueueComplete()) {
- enqueued(msg);
- } else {
- incomplete.add(msg);
- }
-
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
- incomplete.process(enqueuedOp, true);
- sendAcceptAndCompletion();
+ if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
+ // There are no other pending receive completers (just this
SessionState).
+ // Mark the message as completed.
+ completeRcvMsg( msg );
} else {
- incomplete.process(enqueuedOp, false);
+ // There are outstanding receive completers. Save the message
until
+ // they are all done.
+ QPID_LOG(debug, getId() << ": delaying completion of msg seq=" <<
msg->getCommandId());
+ boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new
IncompleteRcvMsg(*this, msg));
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ incompleteRcvMsgs.push_back(pendingMsg);
+ }
+ msg->getReceiveCompletion().end( pendingMsg ); // allows others
to complete
}
}
@@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompleti
sendCompletion();
}
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc). At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message>
msg)
{
+ bool callSendCompletion = false;
receiverCompleted(msg->getCommandId());
if (msg->requiresAccept())
+ // will cause msg's seq to appear in the next message.accept we send.
accepted.add(msg->getCommandId());
+
+ // Are there any outstanding Execution.Sync commands pending the
+ // completion of this msg? If so, complete them.
+ while (!pendingExecutionSyncs.empty() &&
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+ const SequenceNumber& id = pendingExecutionSyncs.front();
+ pendingExecutionSyncs.pop();
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is
completed.");
+ receiverCompleted(id);
+ callSendCompletion = true; // likely peer is pending for this
completion.
+ }
+
+ // if the sender has requested immediate notification of the completion...
+ if (msg->getFrames().getMethod()->isSync()) {
+ sendAcceptAndCompletion();
+ } else if (callSendCompletion) {
+ sendCompletion();
+ }
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState:
return handler->getClusterOrderProxy();
}
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+ SequenceNumber syncCommandId = receiverGetCurrent();
+ if (receiverGetIncomplete().front() < syncCommandId) {
+ currentCommandComplete = false;
+ pendingExecutionSyncs.push(syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync "
<< syncCommandId);
+ }
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion. May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteRcvMsg::operator() (bool sync)
+{
+ QPID_LOG(debug, ": async completion callback for msg seq=" <<
msg->getCommandId() << " sync=" << sync);
+ boost::shared_ptr<IncompleteRcvMsg> tmp;
+ {
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+ for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i =
session->incompleteRcvMsgs.begin();
+ i != session->incompleteRcvMsgs.end(); ++i) {
+ if (i->get() == this) {
+ tmp.swap(*i);
+ session->incompleteRcvMsgs.remove(*i);
+ break;
+ }
+ }
+ }
+
+ if (session->isAttached()) {
+ if (sync) {
+ QPID_LOG(debug, ": receive completed for msg seq=" <<
msg->getCommandId());
+ session->completeRcvMsg(msg);
+ } else { // potentially called from a different thread
+ QPID_LOG(debug, ": scheduling completion for msg seq=" <<
msg->getCommandId());
+
session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter,
tmp));
+ }
+ }
+}
+
+
+/** Scheduled from IncompleteRcvMsg callback, completes the message receive
+ * asynchronously
+ */
+void
SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg>
iMsg)
+{
+ QPID_LOG(debug, ": scheduled completion for msg seq=" <<
iMsg->msg->getCommandId());
+ if (iMsg->session && iMsg->session->isAttached()) {
+ QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg
seq=" << iMsg->msg->getCommandId());
+ iMsg->session->completeRcvMsg(iMsg->msg);
+ }
+}
+
+
+/** Cancels a pending incomplete receive message completion callback. Note
+ * well: will wait for the callback to finish if it is currently in progress
+ * on another thread.
+ */
+void SessionState::IncompleteRcvMsg::cancel()
+{
+ QPID_LOG(debug, session->getId() << ": cancelling outstanding completion
for msg seq=" << msg->getCommandId());
+ // Cancel the message complete callback. On return, we are guaranteed
there
+ // will be no outstanding calls to
SessionState::IncompleteRcvMsg::operator() (bool sync)
+ msg->getReceiveCompletion().cancel();
+ // there may be calls to
SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
+ // clear the session so scheduledCompleter() will ignore this
IncompleteRcvMsg.
+ session = 0;
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Mon Jan 24
20:45:55 2011
@@ -30,10 +30,11 @@
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -122,11 +123,15 @@ class SessionState : public qpid::Sessio
const SessionId& getSessionId() const { return getId(); }
+ // Used by ExecutionHandler sync command processing. Notifies
+ // the SessionState of a received Execution.Sync command.
+ void addPendingExecutionSync();
+
private:
void handleCommand(framing::AMQMethodBody* method, const
framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const
framing::SequenceNumber& id);
- void enqueued(boost::intrusive_ptr<Message> msg);
+ void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -152,8 +157,6 @@ class SessionState : public qpid::Sessio
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
- IncompleteMessageList incomplete;
- IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
@@ -162,7 +165,28 @@ class SessionState : public qpid::Sessio
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
+ // sequence numbers for pending received Execution.Sync commands
+ std::queue<SequenceNumber> pendingExecutionSyncs;
+ bool currentCommandComplete;
+
+ class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+ {
+ public:
+ IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message>
_msg)
+ : session(&_session), msg(_msg) {}
+ virtual void operator() (bool sync);
+ void cancel(); // cancel pending incomplete callback [operator()
above].
+
+ private:
+ SessionState *session;
+ boost::intrusive_ptr<Message> msg;
+ static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg>
incompleteMsg );
+ };
+ std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+ qpid::sys::Mutex incompleteRcvMsgsLock;
+
friend class SessionManager;
+ friend class IncompleteRcvMsg;
};
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/Makefile.am Mon Jan 24 20:45:55
2011
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
InlineVector.cpp \
SequenceSet.cpp \
StringUtils.cpp \
- IncompleteMessageList.cpp \
RangeSet.cpp \
AtomicValue.cpp \
QueueTest.cpp \
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1062965&r1=1062964&r2=1062965&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp Mon Jan 24
20:45:55 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
- BOOST_CHECK_EQUAL( true, (
boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true, (
boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete());
}
QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
- BOOST_CHECK_EQUAL( true,
(boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+ BOOST_CHECK_EQUAL( true,
(boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]