Author: aconway
Date: Mon May  4 17:22:33 2009
New Revision: 771366

URL: http://svn.apache.org/viewvc?rev=771366&view=rev
Log:
Applied PIMPL pattern to SubscriptionManager.

Cleaned up some sloppy #includes.

Added:
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp   (with 
props)
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/LocalQueueImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/PrivateImplRef.h
    qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    qpid/trunk/qpid/cpp/src/tests/echotest.cpp
    qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
    qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
    qpid/trunk/qpid/cpp/src/tests/perftest.cpp
    qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    qpid/trunk/qpid/cpp/src/tests/txshift.cpp
    qpid/trunk/qpid/cpp/src/tests/txtest.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon May  4 17:22:33 2009
@@ -470,7 +470,10 @@
   qpid/client/StateManager.cpp                 \
   qpid/client/Subscription.cpp                 \
   qpid/client/SubscriptionImpl.cpp             \
-  qpid/client/SubscriptionManager.cpp
+  qpid/client/SubscriptionImpl.h               \
+  qpid/client/SubscriptionManager.cpp          \
+  qpid/client/SubscriptionManagerImpl.cpp      \
+  qpid/client/SubscriptionManagerImpl.h
 
 nobase_include_HEADERS = \
   $(platform_hdr) \
@@ -595,7 +598,6 @@
   qpid/client/Execution.h \
   qpid/client/FailoverManager.h \
   qpid/client/Subscription.h \
-  qpid/client/SubscriptionImpl.h \
   qpid/client/SubscriptionSettings.h \
   qpid/client/FlowControl.h \
   qpid/client/Future.h \
@@ -618,6 +620,29 @@
   qpid/client/StateManager.h \
   qpid/client/SubscriptionManager.h \
   qpid/client/TypedResult.h \
+  qpid/client/api/Connection.h \
+  qpid/client/api/ConnectionImpl.h \
+  qpid/client/api/Destination.h \
+  qpid/client/api/Message.h \
+  qpid/client/api/MessageImpl.h \
+  qpid/client/api/MessageListener.h \
+  qpid/client/api/MessageProducer.h \
+  qpid/client/api/MessageProducerImpl.h \
+  qpid/client/api/MessageReceiver.h \
+  qpid/client/api/MessageReceiverImpl.h \
+  qpid/client/api/MessageSink.h \
+  qpid/client/api/MessageSource.h \
+  qpid/client/api/Queue.h \
+  qpid/client/api/Session.h \
+  qpid/client/api/SessionImpl.h \
+  qpid/client/api/Subscription.h \
+  qpid/client/api/SubscriptionImpl.h \
+  qpid/client/api/Topic.h \
+  qpid/client/api/Variant.h \
+  qpid/client/amqp0_10/MessageAdapter.h \
+  qpid/client/amqp0_10/Sinks.h \
+  qpid/client/amqp0_10/Sources.h \
+  qpid/client/amqp0_10/SessionImpl.h \
   qpid/framing/AMQBody.h \
   qpid/framing/AMQCommandControlBody.h \
   qpid/framing/AMQContentBody.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/client/LocalQueueImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueueImpl.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/LocalQueueImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/LocalQueueImpl.h Mon May  4 17:22:33 
2009
@@ -100,7 +100,7 @@
   private:
     Demux::QueuePtr queue;
     Subscription subscription;
-  friend class SubscriptionManager;
+  friend class SubscriptionManagerImpl;
 };
 
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/PrivateImplRef.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/PrivateImplRef.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/PrivateImplRef.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/PrivateImplRef.h Mon May  4 17:22:33 
2009
@@ -76,14 +76,15 @@
     static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
 
     static void set(T& t, const intrusive_ptr& p) {
-        if(t.impl) boost::intrusive_ptr_release(t.impl); 
+        if (t.impl == p) return;
+        if (t.impl) boost::intrusive_ptr_release(t.impl); 
         t.impl = p.get();
         if (t.impl) boost::intrusive_ptr_add_ref(t.impl); 
     }
 
     // Helper functions to implement the ctor, dtor, copy, assign
     static void ctor(T& t, Impl* p) { t.impl = p; if (p) 
boost::intrusive_ptr_add_ref(p); }
-    static void copy(T& t, const T& x) { t.impl = 0; assign(t, x); }
+    static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; 
assign(t, x); }
     static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
     static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Mon May  4 17:22:33 
2009
@@ -46,7 +46,7 @@
 void Subscription::accept(const SequenceSet& messageIds) { 
impl->accept(messageIds); }
 void Subscription::release(const SequenceSet& messageIds) { 
impl->release(messageIds); }
 Session Subscription::getSession() const { return impl->getSession(); }
-SubscriptionManager&Subscription:: getSubscriptionManager() const { return 
impl->getSubscriptionManager(); }
+SubscriptionManager Subscription::getSubscriptionManager() { return 
impl->getSubscriptionManager(); }
 void Subscription::cancel() { impl->cancel(); }
 void Subscription::grantMessageCredit(uint32_t value) { 
impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); }
 void Subscription::grantByteCredit(uint32_t value) { 
impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Mon May  4 17:22:33 2009
@@ -103,7 +103,7 @@
     QPID_CLIENT_EXTERN Session getSession() const;
 
     /** Get the subscription manager associated with this subscription */
-    QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const;
+    QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager();
 
     /** Cancel the subscription. */
     QPID_CLIENT_EXTERN void cancel();

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Mon May  4 
17:22:33 2009
@@ -19,11 +19,14 @@
  *
  */
 
+#include "AsyncSession.h"
 #include "SubscriptionImpl.h"
+#include "SubscriptionManagerImpl.h"
 #include "MessageImpl.h"
 #include "CompletionImpl.h"
 #include "SubscriptionManager.h"
 #include "SubscriptionSettings.h"
+#include "PrivateImplRef.h"
 
 namespace qpid {
 namespace client {
@@ -31,8 +34,8 @@
 using sys::Mutex;
 using framing::MessageAcquireResult;
 
-SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& 
q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
-    : manager(m), name(n), queue(q), settings(s), listener(l)
+SubscriptionImpl::SubscriptionImpl(SubscriptionManager m, const std::string& 
q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
+    : manager(*PrivateImplRef<SubscriptionManager>::get(m)), name(n), 
queue(q), settings(s), listener(l)
 {}
 
 void SubscriptionImpl::subscribe()
@@ -110,7 +113,7 @@
 
 Session SubscriptionImpl::getSession() const { return manager.getSession(); }
 
-SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return 
manager; }
+SubscriptionManager SubscriptionImpl::getSubscriptionManager() { return 
SubscriptionManager(&manager); }
 
 void SubscriptionImpl::cancel() { manager.cancel(name); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Mon May  4 17:22:33 
2009
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/MessageListener.h"
 #include "qpid/client/Demux.h"
@@ -37,11 +38,12 @@
 namespace client {
 
 class SubscriptionManager;
+class SubscriptionManagerImpl;
 
 class SubscriptionImpl : public RefCounted, public MessageListener {
   public:
-    QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager&, const 
std::string& queue,
-                     const SubscriptionSettings&, const std::string& name, 
MessageListener* =0);
+    QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager, const 
std::string& queue,
+        const SubscriptionSettings&, const std::string& name, MessageListener* 
=0);
     
     /** The name of the subsctription, used as the "destination" for messages 
from the broker.
      * Usually the same as the queue name but can be set differently.
@@ -84,7 +86,7 @@
     QPID_CLIENT_EXTERN Session getSession() const;
 
     /** Get the subscription manager associated with this subscription */
-    QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const;
+    QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager();
 
     /** Send subscription request and issue appropriate flow control commands. 
*/
     QPID_CLIENT_EXTERN void subscribe();
@@ -110,7 +112,7 @@
   private:
 
     mutable sys::Mutex lock;
-    SubscriptionManager& manager;
+    SubscriptionManagerImpl& manager;
     std::string name, queue;
     SubscriptionSettings settings;
     framing::SequenceSet unacquired, unaccepted;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Mon May  4 
17:22:33 2009
@@ -18,144 +18,85 @@
  * under the License.
  *
  */
-#ifndef _Subscription_
-#define _Subscription_
 
 #include "SubscriptionManager.h"
-#include "SubscriptionImpl.h"
-#include "LocalQueueImpl.h"
+#include "SubscriptionManagerImpl.h"
 #include "PrivateImplRef.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/framing/Uuid.h>
-#include <set>
-#include <sstream>
 
 
 namespace qpid {
 namespace client {
 
-SubscriptionManager::SubscriptionManager(const Session& s)
-  : dispatcher(s), session(s), autoStop(true)
-{}
+typedef PrivateImplRef<SubscriptionManager> PI;
+
+SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, 
new SubscriptionManagerImpl(s)); }
+SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { 
PI::ctor(*this, i); }
+SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : 
Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); }
+SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); }
+SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& 
x) { return PI::assign(*this, x); }
 
 Subscription SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const 
SubscriptionSettings& ss, const std::string& n)
-{
-    sys::Mutex::ScopedLock l(lock);
-    std::string name=n.empty() ? q:n;
-    boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, 
ss, name, &listener);
-    dispatcher.listen(si);
-    //issue subscription request after listener is registered with dispatcher
-    si->subscribe();
-    return subscriptions[name] = Subscription(si.get());
-}
+{ return impl->subscribe(listener, q, ss, n); }
 
 Subscription SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, 
const std::string& n)
-{
-    sys::Mutex::ScopedLock l(lock);
-    std::string name=n.empty() ? q:n;
-    boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, 
ss, name, 0);
-    boost::intrusive_ptr<LocalQueueImpl> lqi = 
PrivateImplRef<LocalQueue>::get(lq);
-    lqi->queue=si->divert();
-    si->subscribe();
-    lqi->subscription = Subscription(si.get());
-    return subscriptions[name] = lqi->subscription;
-}
+{ return impl->subscribe(lq, q, ss, n); }
+
 
 Subscription SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const std::string& n)
-{
-    return subscribe(listener, q, defaultSettings, n);
-}
+{ return impl->subscribe(listener, q, n); }
+
 
 Subscription SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const std::string& n)
-{
-    return subscribe(lq, q, defaultSettings, n);
-}
+{ return impl->subscribe(lq, q, n); }
 
-void SubscriptionManager::cancel(const std::string& dest)
-{
-    sys::Mutex::ScopedLock l(lock);
-    std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
-    if (i != subscriptions.end()) {
-        sync(session).messageCancel(dest);
-        dispatcher.cancel(dest);
-        Subscription s = i->second;
-        if (s.isValid()) s.impl->cancelDiversion();
-        subscriptions.erase(i);
-    }
-}
+void SubscriptionManager::cancel(const std::string& dest) { return 
impl->cancel(dest); }
 
-void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); }
 
-void SubscriptionManager::run()
-{
-    dispatcher.setAutoStop(autoStop);
-    dispatcher.run();
-}
+void SubscriptionManager::run() { impl->run(); }
 
-void SubscriptionManager::start()
-{
-    dispatcher.setAutoStop(autoStop);
-    dispatcher.start();
-}
+void SubscriptionManager::start() { impl->start(); }
 
-void SubscriptionManager::wait()
-{
-    dispatcher.wait();
-}
+void SubscriptionManager::wait() { impl->wait(); }
 
-void SubscriptionManager::stop()
-{
-    dispatcher.stop();
-}
+void SubscriptionManager::stop() { impl->stop(); }
 
 bool SubscriptionManager::get(Message& result, const std::string& queue, 
sys::Duration timeout) {
-    LocalQueue lq;
-    std::string unique = framing::Uuid(true).str();
-    subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), 
unique);
-    AutoCancel ac(*this, unique);
-    //first wait for message to be delivered if a timeout has been specified
-    if (timeout && lq.get(result, timeout))
-        return true;
-    //make sure message is not on queue before final check
-    sync(session).messageFlush(unique);
-    return lq.get(result, 0);
+    return impl->get(result, queue, timeout);
 }
 
 Message SubscriptionManager::get(const std::string& queue, sys::Duration 
timeout) {
-    Message result;
-    if (!get(result, queue, timeout))
-        throw Exception("Timed out waiting for a message");
-    return result;
+    return impl->get(queue, timeout);
 }
 
-Session SubscriptionManager::getSession() const { return session; }
+Session SubscriptionManager::getSession() const { return impl->getSession(); }
 
 Subscription SubscriptionManager::getSubscription(const std::string& name) 
const {
-    sys::Mutex::ScopedLock l(lock);
-    std::map<std::string, Subscription>::const_iterator i = 
subscriptions.find(name);
-    if (i == subscriptions.end())
-        throw Exception(QPID_MSG("Subscription not found: " << name));
-    return i->second;
+    return impl->getSubscription(name);
 }
-
 void SubscriptionManager::registerFailoverHandler (boost::function<void ()> 
fh) {
-    dispatcher.registerFailoverHandler(fh);
+    impl->registerFailoverHandler(fh); 
 }
 
 void SubscriptionManager::setFlowControl(const std::string& name, const 
FlowControl& flow) {
-    getSubscription(name).setFlowControl(flow);
+    impl->setFlowControl(name, flow);
 }
 
 void SubscriptionManager::setFlowControl(const std::string& name, uint32_t 
messages,  uint32_t bytes, bool window) {
-    setFlowControl(name, FlowControl(messages, bytes, window));
+    impl->setFlowControl(name, FlowControl(messages, bytes, window));
 }
 
+void SubscriptionManager::setFlowControl(uint32_t messages,  uint32_t bytes, 
bool window) {
+    impl->setFlowControl(messages, bytes, window);
+}
+
+void SubscriptionManager::setAcceptMode(AcceptMode mode) { 
impl->setAcceptMode(mode); }
+void SubscriptionManager::setAcquireMode(AcquireMode mode) { 
impl->setAcquireMode(mode); }
+
 }} // namespace qpid::client
 
-#endif
+

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Mon May  4 
17:22:33 2009
@@ -21,22 +21,21 @@
  * under the License.
  *
  */
-#include "qpid/sys/Mutex.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Completion.h>
+
 #include <qpid/client/Session.h>
-#include <qpid/client/AsyncSession.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/LocalQueue.h>
 #include <qpid/client/Subscription.h>
 #include <qpid/sys/Runnable.h>
 #include <qpid/client/ClientImportExport.h>
-#include <set>
-#include <sstream>
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/LocalQueue.h"
+#include <qpid/client/Handle.h>
+#include <string>
 
 namespace qpid {
 namespace client {
 
+class SubscriptionManagerImpl;
+
 /**
  * A class to help create and manage subscriptions.
  * 
@@ -94,11 +93,14 @@
  * </ul>
  * 
  */
-class SubscriptionManager : public sys::Runnable
+class SubscriptionManager : public sys::Runnable, public 
Handle<SubscriptionManagerImpl>
 {
   public:
     /** Create a new SubscriptionManager associated with a session */
     QPID_CLIENT_EXTERN SubscriptionManager(const Session& session);
+    QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&);
+    QPID_CLIENT_EXTERN ~SubscriptionManager();
+    QPID_CLIENT_EXTERN SubscriptionManager& operator=(const 
SubscriptionManager&);
     
     /**
      * Subscribe a MessagesListener to receive messages from queue.
@@ -224,17 +226,17 @@
     /** Set the default settings for subscribe() calls that don't
      * include a SubscriptionSettings parameter.
      */
-    QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s) 
{ defaultSettings = s; }
+    QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s);
 
     /** Get the default settings for subscribe() calls that don't
      * include a SubscriptionSettings parameter.
      */
-    QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const 
{ return defaultSettings; }
+    QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const;
 
     /** Get the default settings for subscribe() calls that don't
      * include a SubscriptionSettings parameter.
      */
-    QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings() { return 
defaultSettings; }
+    QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings();
 
     /**
      * Set the default flow control settings for subscribe() calls
@@ -244,32 +246,28 @@
      *...@param bytes: byte credit.
      *...@param window: if true use window-based flow control.
      */
-    QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages,  uint32_t bytes, 
bool window=true) {
-        defaultSettings.flowControl = FlowControl(messages, bytes, window);
-    }
+    QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages,  uint32_t bytes, 
bool window=true);
 
     /**
      *Set the default accept-mode for subscribe() calls that don't
      *include a SubscriptionSettings parameter.
      */
-    QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode) { 
defaultSettings.acceptMode = mode; }
+    QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode);
 
     /**
      * Set the default acquire-mode subscribe()s that don't specify 
SubscriptionSettings.
      */
-    QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode) { 
defaultSettings.acquireMode = mode; }
+    QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode);
 
     QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function<void ()> 
fh );
 
     QPID_CLIENT_EXTERN Session getSession() const;
 
+    SubscriptionManager(SubscriptionManagerImpl*); ///<@internal
+
   private:
-    mutable sys::Mutex lock;
-    qpid::client::Dispatcher dispatcher;
-    qpid::client::AsyncSession session;
-    bool autoStop;
-    SubscriptionSettings defaultSettings;
-    std::map<std::string, Subscription> subscriptions;
+    typedef SubscriptionManagerImpl Impl;
+    friend class PrivateImplRef<SubscriptionManager>;
 };
 
 /** AutoCancel cancels a subscription in its destructor */

Added: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp?rev=771366&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp Mon May  4 
17:22:33 2009
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SubscriptionManager.h"
+#include "SubscriptionManagerImpl.h"
+#include "SubscriptionImpl.h"
+#include "LocalQueueImpl.h"
+#include "PrivateImplRef.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
+#include <set>
+#include <sstream>
+
+
+namespace qpid {
+namespace client {
+
+SubscriptionManagerImpl::SubscriptionManagerImpl(const Session& s)
+  : dispatcher(s), session(s), autoStop(true)
+{}
+
+Subscription SubscriptionManagerImpl::subscribe(
+    MessageListener& listener, const std::string& q, const 
SubscriptionSettings& ss, const std::string& n)
+{
+    sys::Mutex::ScopedLock l(lock);
+    std::string name=n.empty() ? q:n;
+    boost::intrusive_ptr<SubscriptionImpl> si = new 
SubscriptionImpl(SubscriptionManager(this), q, ss, name, &listener);
+    dispatcher.listen(si);
+    //issue subscription request after listener is registered with dispatcher
+    si->subscribe();
+    return subscriptions[name] = Subscription(si.get());
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+    LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, 
const std::string& n)
+{
+    sys::Mutex::ScopedLock l(lock);
+    std::string name=n.empty() ? q:n;
+    boost::intrusive_ptr<SubscriptionImpl> si = new 
SubscriptionImpl(SubscriptionManager(this), q, ss, name, 0);
+    boost::intrusive_ptr<LocalQueueImpl> lqi = 
PrivateImplRef<LocalQueue>::get(lq);
+    lqi->queue=si->divert();
+    si->subscribe();
+    lqi->subscription = Subscription(si.get());
+    return subscriptions[name] = lqi->subscription;
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+    MessageListener& listener, const std::string& q, const std::string& n)
+{
+    return subscribe(listener, q, defaultSettings, n);
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+    LocalQueue& lq, const std::string& q, const std::string& n)
+{
+    return subscribe(lq, q, defaultSettings, n);
+}
+
+void SubscriptionManagerImpl::cancel(const std::string& dest)
+{
+    sys::Mutex::ScopedLock l(lock);
+    std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
+    if (i != subscriptions.end()) {
+        sync(session).messageCancel(dest);
+        dispatcher.cancel(dest);
+        Subscription s = i->second;
+        if (s.isValid())
+            PrivateImplRef<Subscription>::get(s)->cancelDiversion();
+        subscriptions.erase(i);
+    }
+}
+
+void SubscriptionManagerImpl::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManagerImpl::run()
+{
+    dispatcher.setAutoStop(autoStop);
+    dispatcher.run();
+}
+
+void SubscriptionManagerImpl::start()
+{
+    dispatcher.setAutoStop(autoStop);
+    dispatcher.start();
+}
+
+void SubscriptionManagerImpl::wait()
+{
+    dispatcher.wait();
+}
+
+void SubscriptionManagerImpl::stop()
+{
+    dispatcher.stop();
+}
+
+bool SubscriptionManagerImpl::get(Message& result, const std::string& queue, 
sys::Duration timeout) {
+    LocalQueue lq;
+    std::string unique = framing::Uuid(true).str();
+    subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), 
unique);
+    SubscriptionManager sm(this);
+    AutoCancel ac(sm, unique);
+    //first wait for message to be delivered if a timeout has been specified
+    if (timeout && lq.get(result, timeout))
+        return true;
+    //make sure message is not on queue before final check
+    sync(session).messageFlush(unique);
+    return lq.get(result, 0);
+}
+
+Message SubscriptionManagerImpl::get(const std::string& queue, sys::Duration 
timeout) {
+    Message result;
+    if (!get(result, queue, timeout))
+        throw Exception("Timed out waiting for a message");
+    return result;
+}
+
+Session SubscriptionManagerImpl::getSession() const { return session; }
+
+Subscription SubscriptionManagerImpl::getSubscription(const std::string& name) 
const {
+    sys::Mutex::ScopedLock l(lock);
+    std::map<std::string, Subscription>::const_iterator i = 
subscriptions.find(name);
+    if (i == subscriptions.end())
+        throw Exception(QPID_MSG("Subscription not found: " << name));
+    return i->second;
+}
+
+void SubscriptionManagerImpl::registerFailoverHandler (boost::function<void 
()> fh) {
+    dispatcher.registerFailoverHandler(fh);
+}
+
+void SubscriptionManagerImpl::setFlowControl(const std::string& name, const 
FlowControl& flow) {
+    getSubscription(name).setFlowControl(flow);
+}
+
+void SubscriptionManagerImpl::setFlowControl(const std::string& name, uint32_t 
messages,  uint32_t bytes, bool window) {
+    setFlowControl(name, FlowControl(messages, bytes, window));
+}
+
+}} // namespace qpid::client
+
+

Propchange: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h?rev=771366&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h Mon May  4 
17:22:33 2009
@@ -0,0 +1,278 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H
+#define QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Mutex.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/LocalQueue.h>
+#include <qpid/client/Subscription.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/RefCounted.h>
+#include <set>
+#include <sstream>
+
+namespace qpid {
+namespace client {
+
+/**
+ * A class to help create and manage subscriptions.
+ * 
+ * Set up your subscriptions, then call run() to have messages
+ * delivered.
+ *  
+ * \ingroup clientapi
+ *
+ * \details
+ * 
+ * <h2>Subscribing and canceling subscriptions</h2>
+ *
+ * <ul>
+ * <li> 
+ * <p>subscribe()</p>
+ * <pre> SubscriptionManager subscriptions(session);
+ * Listener listener(subscriptions);
+ * subscriptions.subscribe(listener, myQueue);</pre>
+ * <pre> SubscriptionManager subscriptions(session);
+ * LocalQueue local_queue;
+ * subscriptions.subscribe(local_queue, string("message_queue"));</pre></li>
+ * <li>
+ * <p>cancel()</p>
+ * <pre>subscriptions.cancel();</pre></li>
+ * </ul>
+ * 
+ * <h2>Waiting for messages (and returning)</h2>
+ *
+ * <ul>
+ * <li> 
+ * <p>run()</p>
+ * <pre> // Give up control to receive messages
+ * subscriptions.run();</pre></li>
+ * <li>
+ * <p>stop()</p>
+ * <pre>.// Use this code in a listener to return from run()
+ * subscriptions.stop();</pre></li>
+ * <li>
+ * <p>setAutoStop()</p>
+ * <pre>.// Return from subscriptions.run() when last subscription is cancelled
+ *.subscriptions.setAutoStop(true);
+ *.subscriptons.run();
+ * </pre></li>
+ * <li>
+ * <p>Ending a subscription in a listener</p>
+ * <pre>
+ * void Listener::received(Message&amp; message) {
+ * 
+ *  if (message.getData() == "That's all, folks!") {
+ *       subscriptions.cancel(message.getDestination());
+ *   }
+ * }
+ * </pre>
+ * </li>
+ * </ul>
+ * 
+ */
+class SubscriptionManagerImpl : public sys::Runnable, public RefCounted
+{
+  public:
+    /** Create a new SubscriptionManagerImpl associated with a session */
+    SubscriptionManagerImpl(const Session& session);
+    
+    /**
+     * Subscribe a MessagesListener to receive messages from queue.
+     *
+     * Provide your own subclass of MessagesListener to process
+     * incoming messages. It will be called for each message received.
+     * 
+     *...@param listener Listener object to receive messages.
+     *...@param queue Name of the queue to subscribe to.
+     *...@param settings settings for the subscription.
+     *...@param name unique destination name for the subscription, defaults to 
queue name.
+     */
+    Subscription subscribe(MessageListener& listener,
+                           const std::string& queue,
+                           const SubscriptionSettings& settings,
+                           const std::string& name=std::string());
+
+    /**
+     * Subscribe a LocalQueue to receive messages from queue.
+     * 
+     * Incoming messages are stored in the queue for you to retrieve.
+     * 
+     *...@param queue Name of the queue to subscribe to.
+     *...@param flow initial FlowControl for the subscription.
+     *...@param name unique destination name for the subscription, defaults to 
queue name.
+     * If not specified, the queue name is used.
+     */
+    Subscription subscribe(LocalQueue& localQueue,
+                           const std::string& queue,
+                           const SubscriptionSettings& settings,
+                           const std::string& name=std::string());
+
+    /**
+     * Subscribe a MessagesListener to receive messages from queue.
+     *
+     * Provide your own subclass of MessagesListener to process
+     * incoming messages. It will be called for each message received.
+     * 
+     *...@param listener Listener object to receive messages.
+     *...@param queue Name of the queue to subscribe to.
+     *...@param name unique destination name for the subscription, defaults to 
queue name.
+     * If not specified, the queue name is used.
+     */
+    Subscription subscribe(MessageListener& listener,
+                           const std::string& queue,
+                           const std::string& name=std::string());
+
+    /**
+     * Subscribe a LocalQueue to receive messages from queue.
+     * 
+     * Incoming messages are stored in the queue for you to retrieve.
+     * 
+     *...@param queue Name of the queue to subscribe to.
+     *...@param name unique destination name for the subscription, defaults to 
queue name.
+     * If not specified, the queue name is used.
+     */
+    Subscription subscribe(LocalQueue& localQueue,
+                           const std::string& queue,
+                           const std::string& name=std::string());
+
+
+    /** Get a single message from a queue.
+     *...@param result is set to the message from the queue.
+     *...@param timeout wait up this timeout for a message to appear. 
+     *...@return true if result was set, false if no message available after 
timeout.
+     */
+    bool get(Message& result, const std::string& queue, sys::Duration 
timeout=0);
+
+    /** Get a single message from a queue.
+     *...@param timeout wait up this timeout for a message to appear. 
+     *...@return message from the queue.
+     *...@throw Exception if the timeout is exceeded.
+     */
+    Message get(const std::string& queue, sys::Duration 
timeout=sys::TIME_INFINITE);
+
+    /** Get a subscription by name.
+     *...@throw Exception if not found.
+     */
+    Subscription getSubscription(const std::string& name) const;
+    
+    /** Cancel a subscription. See also: Subscription.cancel() */
+    void cancel(const std::string& name);
+
+    /** Deliver messages in the current thread until stop() is called.
+     * Only one thread may be running in a SubscriptionManager at a time.
+     * @see run
+     */
+    void run();
+
+    /** Start a new thread to deliver messages.
+     * Only one thread may be running in a SubscriptionManager at a time.
+     * @see start
+     */
+    void start();
+
+    /**
+     * Wait for the thread started by a call to start() to complete.
+     */
+    void wait();
+    
+    /** If set true, run() will stop when all subscriptions
+     * are cancelled. If false, run will only stop when stop()
+     * is called. True by default.
+     */
+    void setAutoStop(bool set=true);
+
+    /** Stop delivery. Causes run() to return, or the thread started with 
start() to exit. */
+    void stop();
+
+    static const uint32_t UNLIMITED=0xFFFFFFFF;
+
+    /** Set the flow control for a subscription. */
+    void setFlowControl(const std::string& name, const FlowControl& flow);
+
+    /** Set the flow control for a subscription.
+     *...@param name: name of the subscription.
+     *...@param messages: message credit.
+     *...@param bytes: byte credit.
+     *...@param window: if true use window-based flow control.
+     */
+    void setFlowControl(const std::string& name, uint32_t messages,  uint32_t 
bytes, bool window=true);
+
+    /** Set the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
+     */
+    void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = 
s; }
+
+    /** Get the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
+     */
+    const SubscriptionSettings& getDefaultSettings() const { return 
defaultSettings; }
+
+    /** Get the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
+     */
+    SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
+
+    /**
+     * Set the default flow control settings for subscribe() calls
+     * that don't include a SubscriptionSettings parameter.
+     *
+     *...@param messages: message credit.
+     *...@param bytes: byte credit.
+     *...@param window: if true use window-based flow control.
+     */
+    void setFlowControl(uint32_t messages,  uint32_t bytes, bool window=true) {
+        defaultSettings.flowControl = FlowControl(messages, bytes, window);
+    }
+
+    /**
+     *Set the default accept-mode for subscribe() calls that don't
+     *include a SubscriptionSettings parameter.
+     */
+    void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
+
+    /**
+     * Set the default acquire-mode subscribe()s that don't specify 
SubscriptionSettings.
+     */
+    void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = 
mode; }
+
+    void registerFailoverHandler ( boost::function<void ()> fh );
+
+    Session getSession() const;
+
+  private:
+    mutable sys::Mutex lock;
+    qpid::client::Dispatcher dispatcher;
+    qpid::client::AsyncSession session;
+    bool autoStop;
+    SubscriptionSettings defaultSettings;
+    std::map<std::string, Subscription> subscriptions;
+};
+
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManagerImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Mon May  4 17:22:33 2009
@@ -29,6 +29,7 @@
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/LocalQueue.h"
 #include "qpid/log/Logger.h"
 #include "qpid/log/Options.h"
 #include "qpid/sys/Thread.h"

Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon May  4 17:22:33 2009
@@ -22,7 +22,9 @@
 #include "test_tools.h"
 #include "BrokerFixture.h"
 #include "qpid/client/QueueOptions.h"
+#include "qpid/client/MessageListener.h"
 #include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/AsyncSession.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"

Modified: qpid/trunk/qpid/cpp/src/tests/echotest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/echotest.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/echotest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/echotest.cpp Mon May  4 17:22:33 2009
@@ -21,7 +21,7 @@
 
 #include <qpid/client/Connection.h>
 #include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
 #include <qpid/client/Message.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/sys/Time.h>

Modified: qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon May  4 17:22:33 2009
@@ -23,6 +23,7 @@
 #include "test_tools.h"
 #include "BrokerFixture.h"
 #include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/MessageListener.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/framing/reply_exceptions.h"

Modified: qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Mon May  4 17:22:33 2009
@@ -28,6 +28,7 @@
 #include <vector>
 
 #include "TestOptions.h"
+#include "qpid/sys/Thread.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/Message.h"
 #include "qpid/client/AsyncSession.h"

Modified: qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/perftest.cpp Mon May  4 17:22:33 2009
@@ -28,6 +28,7 @@
 #include "qpid/client/Message.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
 
 #include <boost/lexical_cast.hpp>
 #include <boost/bind.hpp>

Modified: qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Mon May  4 17:22:33 2009
@@ -36,6 +36,7 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/MessageListener.h"
 #include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/Time.h"

Modified: qpid/trunk/qpid/cpp/src/tests/txshift.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/txshift.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/txshift.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/txshift.cpp Mon May  4 17:22:33 2009
@@ -61,7 +61,7 @@
 
     Transfer(const std::string control_) : control(control_), expected(0), 
transfered(0) {}
 
-    void subscribeToSource(SubscriptionManager& manager) 
+    void subscribeToSource(SubscriptionManager manager) 
     {
         sourceSettings.autoAck = 0;//will accept once at the end of the batch
         sourceSettings.flowControl = FlowControl::messageCredit(expected);
@@ -69,7 +69,7 @@
         QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << 
expected);
     }
 
-    void subscribeToControl(SubscriptionManager& manager) 
+    void subscribeToControl(SubscriptionManager manager) 
     {
         controlSettings.flowControl = FlowControl::messageCredit(1);
         controlSubscription = manager.subscribe(*this, control, 
controlSettings);

Modified: qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=771366&r1=771365&r2=771366&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/txtest.cpp Mon May  4 17:22:33 2009
@@ -34,6 +34,7 @@
 #include "qpid/framing/Array.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/sys/uuid.h"
+#include "qpid/sys/Thread.h"
 
 using namespace qpid;
 using namespace qpid::client;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to