Author: gsim
Date: Fri May 24 00:50:47 2013
New Revision: 1485909
URL: http://svn.apache.org/r1485909
Log:
QPID-4859: ensure flush is called on journals
Added:
qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri May 24 00:50:47 2013
@@ -1198,6 +1198,7 @@ set (qpidbroker_SOURCES
qpid/broker/ExchangeRegistry.cpp
qpid/broker/FanOutExchange.cpp
qpid/broker/HeadersExchange.cpp
+ qpid/broker/IngressCompletion.cpp
qpid/broker/Link.cpp
qpid/broker/LinkRegistry.cpp
qpid/broker/LossyQueue.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri May 24 00:50:47 2013
@@ -635,6 +635,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
qpid/broker/AsyncCompletion.h \
+ qpid/broker/IngressCompletion.h \
+ qpid/broker/IngressCompletion.cpp \
qpid/broker/IndexedDeque.h \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
Added: qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp?rev=1485909&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp Fri May 24
00:50:47 2013
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IngressCompletion.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+IngressCompletion::~IngressCompletion() {}
+
+void IngressCompletion::enqueueAsync(boost::shared_ptr<Queue> q)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queues.push_back(q);
+}
+
+void IngressCompletion::flush()
+{
+ Queues copy;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queues.swap(copy);
+ }
+ for (Queues::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+ (*i)->flush();
+ }
+}
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h?rev=1485909&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h Fri May 24 00:50:47
2013
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_INGRESSCOMPLETION_H
+#define QPID_BROKER_INGRESSCOMPLETION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+/**
+ * An AsyncCompletion object for async enqueues, that can be flushed
+ * when needed
+ */
+class IngressCompletion : public AsyncCompletion
+{
+ public:
+ virtual ~IngressCompletion();
+
+ void enqueueAsync(boost::shared_ptr<Queue>);
+ void flush();
+ private:
+ typedef std::vector<boost::shared_ptr<Queue> > Queues;
+ Queues queues;
+ qpid::sys::Mutex lock;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_INGRESSCOMPLETION_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri May 24 00:50:47 2013
@@ -50,7 +50,7 @@ enum MessageState
class Message {
public:
- class Encoding : public AsyncCompletion
+ class Encoding : public IngressCompletion
{
public:
virtual ~Encoding() {}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri May 24
00:50:47 2013
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,7 +22,7 @@
#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
#include <iostream>
using namespace qpid::broker;
@@ -32,7 +33,7 @@ namespace broker {
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() : ingressCompletion(0),
persistenceId(0) {}
-void
PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion>
i)
+void
PersistableMessage::setIngressCompletion(boost::intrusive_ptr<IngressCompletion>
i)
{
ingressCompletion = i.get();
/**
@@ -57,21 +58,13 @@ void PersistableMessage::setIngressCompl
}
}
-
-void PersistableMessage::flush()
-{
- //TODO: is this really the right place for this?
-}
-
-
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr,
MessageStore*)
+void PersistableMessage::enqueueAsync(boost::shared_ptr<Queue> q)
{
enqueueStart();
+ ingressCompletion->enqueueAsync(q);
}
-bool PersistableMessage::isDequeueComplete() { return false; }
void PersistableMessage::dequeueComplete() {}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr,
MessageStore*) {}
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri May 24
00:50:47 2013
@@ -31,8 +31,8 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IngressCompletion.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace types {
@@ -57,36 +57,27 @@ class PersistableMessage : public Persis
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- AsyncCompletion* ingressCompletion;
- boost::intrusive_ptr<AsyncCompletion> holder;
+ IngressCompletion* ingressCompletion;
+ boost::intrusive_ptr<IngressCompletion> holder;
mutable uint64_t persistenceId;
public:
virtual ~PersistableMessage();
PersistableMessage();
- void flush();
-
- QPID_BROKER_EXTERN void setStore(MessageStore*);
-
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
/** track the progress of a message received by the broker - see
ingressCompletion above */
QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return
ingressCompletion->isDone(); }
- QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return
*ingressCompletion; }
- QPID_BROKER_EXTERN void
setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
+ QPID_BROKER_INLINE_EXTERN IngressCompletion& getIngressCompletion() {
return *ingressCompletion; }
+ QPID_BROKER_EXTERN void
setIngressCompletion(boost::intrusive_ptr<IngressCompletion> i);
QPID_BROKER_INLINE_EXTERN void enqueueStart() {
ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() {
ingressCompletion->finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
- MessageStore* _store);
-
+ QPID_BROKER_EXTERN void enqueueAsync(boost::shared_ptr<Queue> queue);
- QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
- MessageStore* _store);
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId =
_persistenceId; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri May 24 00:50:47 2013
@@ -846,7 +846,7 @@ bool Queue::enqueue(TransactionContext*
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg =
msg.getPersistentContext();
assert(pmsg);
- pmsg->enqueueAsync(shared_from_this(), store);
+ pmsg->enqueueAsync(shared_from_this());
try {
store->enqueue(ctxt, pmsg, *this);
} catch (...) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]