Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h Mon Sep 24 13:49:13 2012 @@ -24,9 +24,9 @@ #ifndef qpid_broker_QueueAsyncContext_h_ #define qpid_broker_QueueAsyncContext_h_ -#include "AsyncResultHandle.h" -#include "AsyncStore.h" -#include "TxnHandle.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncStore.h" +#include "qpid/broker/TxnHandle.h" #include "qpid/asyncStore/AsyncOperation.h" @@ -36,7 +36,7 @@ namespace qpid { namespace broker { -class PersistableMessage; +//class PersistableMessage; class PersistableQueue; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.cpp Mon Sep 24 13:49:13 2012 @@ -41,7 +41,8 @@ namespace qpid { namespace broker { -QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} +//QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} +QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {} boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings) { @@ -51,12 +52,15 @@ boost::shared_ptr<Queue> QueueFactory::c // -> if 'ring' policy is in use then subclass boost::shared_ptr<Queue> queue; if (settings.dropMessagesAtLimit) { - queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? asyncStore : 0, parent, broker)); } else if (settings.lvqKey.size()) { std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey)); - queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? asyncStore : 0, parent, broker)); } else { - queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker)); +// queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker)); + queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? asyncStore : 0, parent, broker)); } //2. determine Messages type (i.e. structure) @@ -98,13 +102,15 @@ Broker* QueueFactory::getBroker() { return broker; } -void QueueFactory::setStore (MessageStore* s) +//void QueueFactory::setStore (MessageStore* s) +void QueueFactory::setStore (AsyncStore* as) { - store = s; + asyncStore = as; } -MessageStore* QueueFactory::getStore() const +//MessageStore* QueueFactory::getStore() const +AsyncStore* QueueFactory::getStore() const { - return store; + return asyncStore; } void QueueFactory::setParent(management::Manageable* p) { Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFactory.h Mon Sep 24 13:49:13 2012 @@ -31,7 +31,8 @@ class Manageable; } namespace broker { class Broker; -class MessageStore; +//class MessageStore; +class AsyncStore; class Queue; struct QueueSettings; @@ -52,12 +53,14 @@ class QueueFactory /** * Set the store to use. May only be called once. */ - void setStore (MessageStore*); +// void setStore (MessageStore*); + void setStore (AsyncStore*); /** * Return the message store used. */ - MessageStore* getStore() const; +// MessageStore* getStore() const; + AsyncStore* getStore() const; /** * Register the manageable parent for declared queues @@ -65,7 +68,8 @@ class QueueFactory void setParent(management::Manageable*); private: Broker* broker; - MessageStore* store; +// MessageStore* store; + AsyncStore* asyncStore; management::Manageable* parent; }; }} // namespace qpid::broker Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp Mon Sep 24 13:49:13 2012 @@ -23,9 +23,8 @@ #include "QueueHandle.h" -#include "PrivateImplRef.h" - #include "qpid/asyncStore/QueueHandleImpl.h" +#include "qpid/broker/PrivateImplRef.h" namespace qpid { namespace broker { Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp Mon Sep 24 13:49:13 2012 @@ -100,12 +100,14 @@ Queue::shared_ptr QueueRegistry::get(con return q; } -void QueueRegistry::setStore (MessageStore* _store) +//void QueueRegistry::setStore (MessageStore* _store) +void QueueRegistry::setStore (AsyncStore* _store) { QueueFactory::setStore(_store); } -MessageStore* QueueRegistry::getStore() const +//MessageStore* QueueRegistry::getStore() const +AsyncStore* QueueRegistry::getStore() const { return QueueFactory::getStore(); } Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h Mon Sep 24 13:49:13 2012 @@ -97,12 +97,14 @@ class QueueRegistry : QueueFactory { /** * Set the store to use. May only be called once. */ - void setStore (MessageStore*); +// void setStore (MessageStore*); + void setStore (AsyncStore*); /** * Return the message store used. */ - MessageStore* getStore() const; +// MessageStore* getStore() const; + AsyncStore* getStore() const; /** * Register the manageable parent for declared queues Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h Mon Sep 24 13:49:13 2012 @@ -23,7 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/TxOp.h" #include <algorithm> Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h Mon Sep 24 13:49:13 2012 @@ -23,7 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" #include "qpid/broker/TxOp.h" #include <algorithm> Added: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp?rev=1389378&view=auto ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp (added) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.cpp Mon Sep 24 13:49:13 2012 @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryAsyncContext.cpp + */ + +#include "RecoveryAsyncContext.h" + +namespace qpid { +namespace broker { + +RecoveryAsyncContext::RecoveryAsyncContext(RecoveryManagerImpl& rm, + AsyncResultCallback rcb, + AsyncResultQueue* const arq) : + m_rm(rm), + m_rcb(rcb), + m_arq(arq) +{} + +RecoveryAsyncContext::~RecoveryAsyncContext() {} + +RecoveryManagerImpl& +RecoveryAsyncContext::getRecoveryManager() const { + return m_rm; +} + + +AsyncResultQueue* +RecoveryAsyncContext::getAsyncResultQueue() const { + return m_arq; +} + +void +RecoveryAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const { + if (m_rcb) { + m_rcb(arh); + } +} + +}} // namespace qpid::broker Added: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h?rev=1389378&view=auto ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h (added) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryAsyncContext.h Mon Sep 24 13:49:13 2012 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryAsyncContext.h + */ + +#ifndef qpid_broker_RecoveryAsyncContext_h_ +#define qpid_broker_RecoveryAsyncContext_h_ + +#include "qpid/broker/AsyncStore.h" + +namespace qpid { +namespace broker { +class AsyncResultHandle; +class RecoveryManagerImpl; + +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + +class RecoveryAsyncContext: public qpid::broker::BrokerAsyncContext { +public: + RecoveryAsyncContext(RecoveryManagerImpl& rm, + AsyncResultCallback rcb, + AsyncResultQueue* const arq); + virtual ~RecoveryAsyncContext(); + RecoveryManagerImpl& getRecoveryManager() const; + AsyncResultQueue* getAsyncResultQueue() const; + void invokeCallback(const AsyncResultHandle* const) const; + +private: + RecoveryManagerImpl& m_rm; + AsyncResultCallback m_rcb; + AsyncResultQueue* const m_arq; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_RecoveryAsyncContext_h_ Added: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp?rev=1389378&view=auto ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp (added) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.cpp Mon Sep 24 13:49:13 2012 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandle.cpp + */ + +#include "RecoveryHandle.h" + +#include "qpid/asyncStore/RecoveryHandleImpl.h" +#include "qpid/broker/PrivateImplRef.h" + +namespace qpid { +namespace broker { + +typedef PrivateImplRef<RecoveryHandle> PrivateImpl; + +RecoveryHandle::RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p) : + Handle<qpid::asyncStore::RecoveryHandleImpl>() +{ + PrivateImpl::ctor(*this, p); +} + +RecoveryHandle::RecoveryHandle(const RecoveryHandle& r) : + Handle<qpid::asyncStore::RecoveryHandleImpl>() +{ + PrivateImpl::copy(*this, r); +} + +RecoveryHandle::~RecoveryHandle() { + PrivateImpl::dtor(*this); +} + +RecoveryHandle& +RecoveryHandle::operator=(const RecoveryHandle& r) { + return PrivateImpl::assign(*this, r); +} + +}} // namespace qpid */ Added: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h?rev=1389378&view=auto ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h (added) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryHandle.h Mon Sep 24 13:49:13 2012 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file RecoveryHandle.h + */ + +#ifndef qpid_broker_RecoveryHandle_h_ +#define qpid_broker_RecoveryHandle_h_ + +#include "qpid/asyncStore/AsyncStoreHandle.h" +#include "qpid/broker/Handle.h" + +namespace qpid { +namespace asyncStore { +class RecoveryHandleImpl; +} +namespace broker { + +class RecoveryHandle: public qpid::broker::Handle<qpid::asyncStore::RecoveryHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle +{ +public: + RecoveryHandle(qpid::asyncStore::RecoveryHandleImpl* p = 0); + RecoveryHandle(const RecoveryHandle& r); + virtual ~RecoveryHandle(); + RecoveryHandle& operator=(const RecoveryHandle& r); + + // --- RecoveryHandleImpl methods --- + // <none> + +private: + friend class PrivateImplRef<RecoveryHandle>; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_RecoveryHandle_h_ Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManager.h Mon Sep 24 13:49:13 2012 @@ -45,15 +45,18 @@ class RecoveryManager{ virtual void recoveryComplete() = 0; }; +// kpvdr: this has been replaced with AsyncRecoverable defined in AsyncStore.h +/* class Recoverable { public: virtual ~Recoverable() {} - /** + * * Request recovery of queue and message state. - */ + virtual void recover(RecoveryManager& recoverer) = 0; }; +*/ }} Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp Mon Sep 24 13:49:13 2012 @@ -155,7 +155,8 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store) +//void SemanticState::commit(MessageStore* const store) +void SemanticState::commit(AsyncStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h Mon Sep 24 13:49:13 2012 @@ -55,7 +55,8 @@ namespace qpid { namespace broker { class Exchange; -class MessageStore; +//class MessageStore; +class AsyncStore; class SessionContext; class SessionState; @@ -233,7 +234,8 @@ class SemanticState : private boost::non void stop(const std::string& destination); void startTx(); - void commit(MessageStore* const store); +// void commit(MessageStore* const store); + void commit(AsyncStore* const store); void rollback(); void selectDtx(); bool getDtxSelected() const { return dtxSelected; } Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp Mon Sep 24 13:49:13 2012 @@ -655,7 +655,9 @@ XaResult SessionAdapter::DtxHandlerImpl: DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); +// getBroker().getStore().collectPreparedXids(xids); + // TODO: kpvdr: When designing async store with gsim, it was decided that this function + // would be performed outside the store. Resolve this function. /* * create array of long structs */ Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.cpp Mon Sep 24 13:49:13 2012 @@ -337,4 +337,11 @@ TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); +// DataSource interface - used to write persistence data to async store +// TODO: kpvdr: implement +uint64_t TopicExchange::getSize() { + return 0; +} +void TopicExchange::write(char* /*target*/) {} + }} // namespace qpid::broker Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/TopicExchange.h Mon Sep 24 13:49:13 2012 @@ -109,6 +109,11 @@ public: QPID_BROKER_EXTERN virtual ~TopicExchange(); virtual bool supportsDynamicBinding() { return true; } + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + + class TopicExchangeTester; friend class TopicExchangeTester; }; Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp Mon Sep 24 13:49:13 2012 @@ -53,20 +53,22 @@ void TxBuffer::enlist(TxOp::shared_ptr o ops.push_back(op); } -bool TxBuffer::commitLocal(TransactionalStore* const store) +//bool TxBuffer::commitLocal(TransactionalStore* const store) +bool TxBuffer::commitLocal(AsyncTransactionalStore* const asyncTxnStore) { - if (!store) return false; + if (!asyncTxnStore) return false; try { - std::auto_ptr<TransactionContext> ctxt = store->begin(); - if (prepare(ctxt.get())) { - store->commit(*ctxt); - commit(); - return true; - } else { - store->abort(*ctxt); - rollback(); - return false; - } +// std::auto_ptr<TransactionContext> ctxt = asyncTxnStore->begin(); +// if (prepare(ctxt.get())) { +// asyncTxnStore->commit(*ctxt); +// commit(); +// return true; +// } else { +// asyncTxnStore->abort(*ctxt); +// rollback(); +// return false; +// } + // TODO: kpvdr: add async local transaction commits here } catch (std::exception& e) { QPID_LOG(error, "Commit failed with exception: " << e.what()); } catch (...) { Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h Mon Sep 24 13:49:13 2012 @@ -59,6 +59,8 @@ */ namespace qpid { namespace broker { + class AsyncTransactionalStore; + class TxBuffer{ typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; std::vector<TxOp::shared_ptr> ops; @@ -107,7 +109,8 @@ namespace qpid { * Helper method for managing the process of server local * commit */ - QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); +// QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); + QPID_BROKER_EXTERN bool commitLocal(AsyncTransactionalStore* const store); }; } } Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Sep 24 13:49:13 2012 @@ -620,6 +620,10 @@ bool BrokerReplicator::bind(boost::share bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } +// DataSource interface - used to write persistence data to async store +uint64_t BrokerReplicator::getSize() { return 0; } +void BrokerReplicator::write(char* /*target*/) {} + string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } }} // namespace broker Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.h Mon Sep 24 13:49:13 2012 @@ -76,6 +76,10 @@ class BrokerReplicator : public broker:: void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp Mon Sep 24 13:49:13 2012 @@ -195,4 +195,8 @@ bool QueueReplicator::unbind(boost::shar bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } +// DataSource interface - used to write persistence data to async store +uint64_t QueueReplicator::getSize() { return 0; } +void QueueReplicator::write(char* /*target*/) {} + }} // namespace qpid::broker Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.h Mon Sep 24 13:49:13 2012 @@ -77,6 +77,10 @@ class QueueReplicator : public broker::E void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + private: void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.h Mon Sep 24 13:49:13 2012 @@ -48,6 +48,10 @@ class ManagementDirectExchange : public void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); virtual ~ManagementDirectExchange(); + + // DataSource interface - used to write persistence data to async store + uint64_t getSize() { return 0; } + void write(char* /*target*/) {} }; Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.h Mon Sep 24 13:49:13 2012 @@ -52,6 +52,11 @@ class ManagementTopicExchange : public v void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); virtual ~ManagementTopicExchange(); + + // DataSource interface - used to write persistence data to async store + uint64_t getSize() { return 0; } + void write(char* /*target*/) {} + }; Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp Mon Sep 24 13:49:13 2012 @@ -101,7 +101,8 @@ MessageStorePlugin::earlyInitialize (qpi provider->second->activate(*this); NoopDeleter d; boost::shared_ptr<qpid::broker::MessageStore> sp(this, d); - broker->setStore(sp); +// broker->setStore(sp); + // TODO: kpvdr: Windows store earlyInitialize() target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this)); } Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp (original) +++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp Mon Sep 24 13:49:13 2012 @@ -430,6 +430,11 @@ bool XmlExchange::MatchQueueAndOrigin::o const std::string XmlExchange::typeName("xml"); - + + +// DataSource interface - used to write persistence data to async store +uint64_t XmlExchange::getSize() { return 0; } +void XmlExchange::write(char* /*target*/) {} + } } Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h (original) +++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h Mon Sep 24 13:49:13 2012 @@ -94,6 +94,10 @@ class XmlExchange : public virtual Excha virtual ~XmlExchange(); + // DataSource interface - used to write persistence data to async store + uint64_t getSize(); + void write(char* target); + struct MatchOrigin { const std::string origin; MatchOrigin(const std::string& origin); Modified: qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp (original) +++ qpid/branches/asyncstore/cpp/src/tests/AsyncCompletion.cpp Mon Sep 24 13:49:13 2012 @@ -16,33 +16,34 @@ * */ +// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore -#include "unit_test.h" -#include "test_tools.h" -#include "BrokerFixture.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/sys/BlockingQueue.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/QueueQueryResult.h" -#include "qpid/client/TypedResult.h" - -using namespace std; -using namespace qpid; -using namespace client; -using namespace framing; - -namespace qpid { namespace broker { -class TransactionContext; -class PersistableQueue; -}} - -using broker::PersistableMessage; -using broker::NullMessageStore; -using broker::TransactionContext; -using broker::PersistableQueue; -using sys::TIME_SEC; -using boost::intrusive_ptr; +//#include "unit_test.h" +//#include "test_tools.h" +//#include "BrokerFixture.h" +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/sys/BlockingQueue.h" +//#include "qpid/client/AsyncSession.h" +//#include "qpid/sys/Time.h" +//#include "qpid/framing/QueueQueryResult.h" +//#include "qpid/client/TypedResult.h" +// +//using namespace std; +//using namespace qpid; +//using namespace client; +//using namespace framing; +// +//namespace qpid { namespace broker { +//class TransactionContext; +//class PersistableQueue; +//}} +// +//using broker::PersistableMessage; +//using broker::NullMessageStore; +//using broker::TransactionContext; +//using broker::PersistableQueue; +//using sys::TIME_SEC; +//using boost::intrusive_ptr; /** @file Unit tests for async completion. * Using a dummy store, verify that the broker indicates async completion of @@ -52,6 +53,7 @@ using boost::intrusive_ptr; namespace qpid { namespace tests { +/* class AsyncCompletionMessageStore : public NullMessageStore { public: sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued; @@ -72,8 +74,10 @@ QPID_AUTO_TEST_SUITE(AsyncCompletionTest QPID_AUTO_TEST_CASE(testWaitTillComplete) { SessionFixture fix; AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; - boost::shared_ptr<qpid::broker::MessageStore> p; - p.reset(store); +// boost::shared_ptr<qpid::broker::MessageStore> p; + boost::shared_ptr<qpid::broker::AsyncStore> p; +// p.reset(store); + // TODO: kpvdr: Rewrite this test to use AsyncStore fix.broker->setStore(p); AsyncSession s = fix.session; @@ -116,5 +120,6 @@ QPID_AUTO_TEST_CASE(testGetResult) { } QPID_AUTO_TEST_SUITE_END() +*/ }} // namespace qpid::tests Modified: qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp (original) +++ qpid/branches/asyncstore/cpp/src/tests/DtxWorkRecordTest.cpp Mon Sep 24 13:49:13 2012 @@ -32,6 +32,7 @@ namespace tests { QPID_AUTO_TEST_SUITE(DtxWorkRecordTestSuite) +/* QPID_AUTO_TEST_CASE(testOnePhaseCommit){ MockTransactionalStore store; store.expectBegin().expectCommit(); @@ -187,6 +188,8 @@ QPID_AUTO_TEST_CASE(testRollback){ opA->check(); opB->check(); } +*/ +// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store QPID_AUTO_TEST_SUITE_END() Modified: qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp (original) +++ qpid/branches/asyncstore/cpp/src/tests/QueueTest.cpp Mon Sep 24 13:49:13 2012 @@ -29,7 +29,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" Modified: qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp (original) +++ qpid/branches/asyncstore/cpp/src/tests/TxBufferTest.cpp Mon Sep 24 13:49:13 2012 @@ -32,6 +32,7 @@ namespace tests { QPID_AUTO_TEST_SUITE(TxBufferTestSuite) +/* QPID_AUTO_TEST_CASE(testCommitLocal) { MockTransactionalStore store; @@ -175,6 +176,8 @@ QPID_AUTO_TEST_CASE(testBufferIsClearedA opA->check(); opB->check(); } +*/ +// TODO: kpvdr: Rewrite this test (and TxMocks.h) to use Async store QPID_AUTO_TEST_SUITE_END() Modified: qpid/branches/asyncstore/cpp/src/tests/test_store.cpp URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/test_store.cpp?rev=1389378&r1=1389377&r2=1389378&view=diff ============================================================================== --- qpid/branches/asyncstore/cpp/src/tests/test_store.cpp (original) +++ qpid/branches/asyncstore/cpp/src/tests/test_store.cpp Mon Sep 24 13:49:13 2012 @@ -19,7 +19,7 @@ * */ - +// TODO: kpvdr: Rewrite this test in terms of an Null AsyncStore /**@file * Plug-in message store for tests. * @@ -32,26 +32,26 @@ * - do async completion after a delay. */ -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/amqp_0_10/MessageTransfer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/log/Statement.h" -#include "qpid/Plugin.h" -#include "qpid/Options.h" -#include <boost/cast.hpp> -#include <boost/lexical_cast.hpp> -#include <memory> -#include <fstream> - -using namespace qpid; -using namespace broker; -using namespace std; -using namespace qpid::sys; +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/Broker.h" +//#include "qpid/broker/amqp_0_10/MessageTransfer.h" +//#include "qpid/framing/AMQFrame.h" +//#include "qpid/log/Statement.h" +//#include "qpid/Plugin.h" +//#include "qpid/Options.h" +//#include <boost/cast.hpp> +//#include <boost/lexical_cast.hpp> +//#include <memory> +//#include <fstream> +// +//using namespace qpid; +//using namespace broker; +//using namespace std; +//using namespace qpid::sys; namespace qpid { namespace tests { - +/* struct TestStoreOptions : public Options { string name; @@ -76,6 +76,7 @@ struct Completer : public Runnable { } }; + class TestStore : public NullMessageStore { public: TestStore(const TestStoreOptions& opts, Broker& broker_) @@ -157,6 +158,7 @@ const string TestStore::EXCEPTION = "exc const string TestStore::EXIT_PROCESS = "exit_process"; const string TestStore::ASYNC="async "; + struct TestStorePlugin : public Plugin { TestStoreOptions options; @@ -168,12 +170,13 @@ struct TestStorePlugin : public Plugin { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; boost::shared_ptr<MessageStore> p(new TestStore(options, *broker)); - broker->setStore (p); +// broker->setStore (p); + // TODO: kpvdr: This test will need to be reworked in terms of an AsyncStore. } void initialize(qpid::Plugin::Target&) {} }; static TestStorePlugin pluginInstance; - +*/ }} // namespace qpid::tests --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
