Author: kpvdr
Date: Fri Jul 20 13:50:48 2012
New Revision: 1363776
URL: http://svn.apache.org/viewvc?rev=1363776&view=rev
Log:
QPID-3858: WIP: Moved QueueAsycnContext from namespace
tests::storePerftools::asyncPerf to qpid::broker
Added:
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp
qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
Removed:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
Modified:
qpid/branches/asyncstore/cpp/src/asyncstore.cmake
qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
Modified: qpid/branches/asyncstore/cpp/src/asyncstore.cmake
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/asyncstore.cmake?rev=1363776&r1=1363775&r2=1363776&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/asyncstore.cmake Fri Jul 20 13:50:48 2012
@@ -62,6 +62,7 @@ set (asyncStore_SOURCES
qpid/broker/EnqueueHandle.cpp
qpid/broker/EventHandle.cpp
qpid/broker/MessageHandle.cpp
+ qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueHandle.cpp
qpid/broker/TxnAsyncContext.cpp
qpid/broker/TxnBuffer.cpp
Added: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp?rev=1363776&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp Fri Jul
20 13:50:48 2012
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueueAsyncContext.cpp
+ */
+
+#include "QueueAsyncContext.h"
+
+#include "qpid/broker/PersistableMessage.h"
+
+#include <cassert>
+
+namespace qpid {
+namespace broker {
+
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ TxnHandle& th,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_q(q),
+ m_th(th),
+ m_rcb(rcb),
+ m_arq(arq)
+{
+ assert(m_q.get() != 0);
+}
+
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ boost::intrusive_ptr<PersistableMessage>
msg,
+ TxnHandle& th,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq) :
+ m_q(q),
+ m_msg(msg),
+ m_th(th),
+ m_rcb(rcb),
+ m_arq(arq)
+{
+ assert(m_q.get() != 0);
+ assert(m_msg.get() != 0);
+}
+
+QueueAsyncContext::~QueueAsyncContext()
+{}
+
+boost::shared_ptr<PersistableQueue>
+QueueAsyncContext::getQueue() const
+{
+ return m_q;
+}
+
+boost::intrusive_ptr<PersistableMessage>
+QueueAsyncContext::getMessage() const
+{
+ return m_msg;
+}
+
+TxnHandle
+QueueAsyncContext::getTxnHandle() const
+{
+ return m_th;
+}
+
+AsyncResultQueue*
+QueueAsyncContext::getAsyncResultQueue() const
+{
+ return m_arq;
+}
+
+AsyncResultCallback
+QueueAsyncContext::getAsyncResultCallback() const
+{
+ return m_rcb;
+}
+
+void
+QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const
+{
+ if (m_rcb) {
+ m_rcb(arh);
+ }
+}
+
+void
+QueueAsyncContext::destroy()
+{
+ delete this;
+}
+
+}} // namespace qpid::broker
Added: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h?rev=1363776&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h (added)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h Fri Jul 20
13:50:48 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueueAsyncContext.h
+ */
+
+#ifndef qpid_broker_QueueAsyncContext_h_
+#define qpid_broker_QueueAsyncContext_h_
+
+#include "AsyncResultHandle.h"
+#include "AsyncStore.h"
+#include "TxnHandle.h"
+
+#include "qpid/asyncStore/AsyncOperation.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PersistableMessage;
+class PersistableQueue;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
+class QueueAsyncContext: public BrokerAsyncContext
+{
+public:
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ TxnHandle& th,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ boost::intrusive_ptr<PersistableMessage> msg,
+ TxnHandle& th,
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
+ virtual ~QueueAsyncContext();
+ boost::shared_ptr<PersistableQueue> getQueue() const;
+ boost::intrusive_ptr<PersistableMessage> getMessage() const;
+ TxnHandle getTxnHandle() const;
+ AsyncResultQueue* getAsyncResultQueue() const;
+ AsyncResultCallback getAsyncResultCallback() const;
+ void invokeCallback(const AsyncResultHandle* const arh) const;
+ void destroy();
+
+private:
+ boost::shared_ptr<PersistableQueue> m_q;
+ boost::intrusive_ptr<PersistableMessage> m_msg;
+ TxnHandle m_th; // TODO: get rid of this when
tests::storePerftools::asyncPerf::SimpleQueue has solved its TxnHandle issues.
+ AsyncResultCallback m_rcb;
+ AsyncResultQueue* const m_arq;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_QueueAsyncContext_h_
Modified: qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake?rev=1363776&r1=1363775&r2=1363776&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/tests/asyncstore.cmake Fri Jul 20 13:50:48
2012
@@ -59,7 +59,7 @@ set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/PerfTest.cpp
storePerftools/asyncPerf/PersistableQueuedMessage.cpp
- storePerftools/asyncPerf/QueueAsyncContext.cpp
+# storePerftools/asyncPerf/QueueAsyncContext.cpp
storePerftools/asyncPerf/QueuedMessage.cpp
storePerftools/asyncPerf/SimpleMessage.cpp
storePerftools/asyncPerf/SimpleQueue.cpp
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp?rev=1363776&r1=1363775&r2=1363776&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
Fri Jul 20 13:50:48 2012
@@ -27,10 +27,10 @@
#include "MessageConsumer.h"
#include "MessageDeque.h"
#include "PersistableQueuedMessage.h"
-#include "QueueAsyncContext.h"
#include "SimpleMessage.h"
#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/TxnHandle.h"
#include <string.h> // memcpy()
@@ -90,10 +90,10 @@ void
SimpleQueue::asyncCreate()
{
if (m_store) {
- boost::shared_ptr<QueueAsyncContext> qac(new
QueueAsyncContext(shared_from_this(),
-
s_nullTxnHandle,
-
&handleAsyncCreateResult,
-
&m_resultQueue));
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new
qpid::broker::QueueAsyncContext(shared_from_this(),
+
s_nullTxnHandle,
+
&handleAsyncCreateResult,
+
&m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
@@ -103,13 +103,15 @@ SimpleQueue::asyncCreate()
void
SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle*
const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc =
boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+
boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq =
boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing
a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\":
Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " <<
qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" <<
std::endl;
} else {
- qc->getQueue()->createComplete(qc);
+ sq->createComplete(qc);
}
}
}
@@ -120,10 +122,10 @@ SimpleQueue::asyncDestroy(const bool del
m_destroyPending = true;
if (m_store) {
if (deleteQueue) {
- boost::shared_ptr<QueueAsyncContext> qac(new
QueueAsyncContext(shared_from_this(),
-
s_nullTxnHandle,
-
&handleAsyncDestroyResult,
-
&m_resultQueue));
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new
qpid::broker::QueueAsyncContext(shared_from_this(),
+
s_nullTxnHandle,
+
&handleAsyncDestroyResult,
+
&m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
@@ -135,13 +137,15 @@ SimpleQueue::asyncDestroy(const bool del
void
SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle*
const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc =
boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+
boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq =
boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing
a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\":
Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " <<
qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" <<
std::endl;
} else {
- qc->getQueue()->destroyComplete(qc);
+ sq->destroyComplete(qc);
}
}
}
@@ -356,18 +360,16 @@ SimpleQueue::asyncEnqueue(qpid::broker::
{
assert(pqm.get());
// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is
set by store itself - find way to do this
- boost::shared_ptr<QueueAsyncContext> qac(new
QueueAsyncContext(shared_from_this(),
-
pqm->payload(),
- th,
-
&handleAsyncEnqueueResult,
-
&m_resultQueue));
- // TODO : This must be done from inside store, not here
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new
qpid::broker::QueueAsyncContext(shared_from_this(),
+
pqm->payload(),
+
th,
+
&handleAsyncEnqueueResult,
+
&m_resultQueue));
+ // TODO : This must be done from inside store, not here (the txn handle is
opaque outside the store)
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(pqm->enqHandle(),
- th,
- qac);
+ m_store->submitEnqueue(pqm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -376,13 +378,15 @@ SimpleQueue::asyncEnqueue(qpid::broker::
void
SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle*
const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc =
boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+
boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq =
boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing
a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\":
Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " <<
qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" <<
std::endl;
} else {
- qc->getQueue()->enqueueComplete(qc);
+ sq->enqueueComplete(qc);
}
}
}
@@ -393,12 +397,12 @@ SimpleQueue::asyncDequeue(qpid::broker::
boost::shared_ptr<PersistableQueuedMessage> pqm)
{
assert(pqm.get());
- boost::shared_ptr<QueueAsyncContext> qac(new
QueueAsyncContext(shared_from_this(),
-
pqm->payload(),
- th,
-
&handleAsyncDequeueResult,
-
&m_resultQueue));
- // TODO : This must be done from inside store, not here
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new
qpid::broker::QueueAsyncContext(shared_from_this(),
+
pqm->payload(),
+
th,
+
&handleAsyncDequeueResult,
+
&m_resultQueue));
+ // TODO : This must be done from inside store, not here (the txn handle is
opaque outside the store)
if (th.isValid()) {
th.incrOpCnt();
}
@@ -412,13 +416,15 @@ SimpleQueue::asyncDequeue(qpid::broker::
void
SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle*
const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc =
boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+
boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq =
boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing
a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\":
Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " <<
qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" <<
std::endl;
} else {
- qc->getQueue()->dequeueComplete(qc);
+ sq->dequeueComplete(qc);
}
}
}
@@ -436,7 +442,7 @@ SimpleQueue::destroyCheck(const std::str
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::createComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -444,7 +450,7 @@ SimpleQueue::createComplete(const boost:
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::flushComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -452,7 +458,7 @@ SimpleQueue::flushComplete(const boost::
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::destroyComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -461,12 +467,12 @@ SimpleQueue::destroyComplete(const boost
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::enqueueComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- // TODO : This must be done from inside store, not here
+ // TODO : This must be done from inside store, not here (the txn handle is
opaque outside the store)
qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
@@ -475,12 +481,12 @@ SimpleQueue::enqueueComplete(const boost
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::dequeueComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- // TODO : This must be done from inside store, not here
+ // TODO : This must be done from inside store, not here (the txn handle is
opaque outside the store)
qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
Modified:
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
URL:
http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h?rev=1363776&r1=1363775&r2=1363776&view=diff
==============================================================================
---
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
(original)
+++
qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
Fri Jul 20 13:50:48 2012
@@ -37,6 +37,7 @@
namespace qpid {
namespace broker {
class AsyncResultQueue;
+class QueueAsyncContext;
}
namespace framing {
class FieldTable;
@@ -49,7 +50,6 @@ namespace asyncPerf {
class MessageConsumer;
class Messages;
class PersistableQueuedMessage;
-class QueueAsyncContext;
class QueuedMessage;
class SimpleMessage;
@@ -146,11 +146,11 @@ private:
void destroyCheck(const std::string& opDescr) const;
// --- Async op completions (called through handleAsyncResult) ---
- void createComplete(const boost::shared_ptr<QueueAsyncContext> qc);
- void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc);
- void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc);
- void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
- void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void createComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
+ void flushComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
+ void destroyComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
+ void enqueueComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
+ void dequeueComplete(const
boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
};
}}} // namespace tests::storePerftools::asyncPerf
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]