Author: gsim
Date: Mon Nov 16 17:30:23 2009
New Revision: 880863
URL: http://svn.apache.org/viewvc?rev=880863&view=rev
Log:
QPID-664: Remove start()/stop() methods from api
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Mon Nov 16 17:30:23
2009
@@ -80,16 +80,6 @@
* serving before throwing an exception.
*/
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration
timeout=qpid::sys::TIME_INFINITE);
-
- /**
- * Enables the message flow for this receiver
- */
- QPID_CLIENT_EXTERN void start();
- /**
- * Stops the message flow for this receiver (but does not cancel
- * the subscription).
- */
- QPID_CLIENT_EXTERN void stop();
/**
* Sets the capacity for the receiver. The capacity determines how
* many incoming messages can be held in the receiver before being
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Mon Nov 16
17:30:23 2009
@@ -74,12 +74,16 @@
void ReceiverImpl::start()
{
- execute<Start>();
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow();
+ }
}
void ReceiverImpl::stop()
{
- execute<Stop>();
+ state = STOPPED;
+ session.messageStop(destination);
}
void ReceiverImpl::setCapacity(uint32_t c)
@@ -103,14 +107,14 @@
session = s;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
- state = STOPPED;//TODO: if session is started, go straight to started
+ state = STARTED;
}
if (state == CANCELLED) {
source->cancel(session, destination);
parent.receiverCancelled(destination);
} else {
source->subscribe(session, destination);
- if (state == STARTED) start();
+ start();
}
}
@@ -171,20 +175,6 @@
}
}
-void ReceiverImpl::startImpl()
-{
- if (state == STOPPED) {
- state = STARTED;
- startFlow();
- }
-}
-
-void ReceiverImpl::stopImpl()
-{
- state = STOPPED;
- session.messageStop(destination);
-}
-
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
if (c != capacity) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Mon Nov 16
17:30:23 2009
@@ -80,8 +80,6 @@
//implementation of public facing methods
bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration
timeout);
bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration
timeout);
- void startImpl();
- void stopImpl();
void cancelImpl();
void setCapacityImpl(uint32_t);
@@ -116,18 +114,6 @@
void operator()() { result = impl.fetchImpl(message, timeout); }
};
- struct Stop : Command
- {
- Stop(ReceiverImpl& i) : Command(i) {}
- void operator()() { impl.stopImpl(); }
- };
-
- struct Start : Command
- {
- Start(ReceiverImpl& i) : Command(i) {}
- void operator()() { impl.startImpl(); }
- };
-
struct Cancel : Command
{
Cancel(ReceiverImpl& i) : Command(i) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Mon Nov 16
17:30:23 2009
@@ -344,13 +344,18 @@
void SessionImpl::rollbackImpl()
{
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i)
i->second.stop();
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i)
{
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
+ }
//ensure that stop has been processed and all previously sent
//messages are available for release:
session.sync();
incoming.releaseAll();
session.txRollback();
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i)
i->second.start();
+
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i)
{
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
+ }
}
void SessionImpl::acknowledgeImpl()
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Mon Nov 16 17:30:23 2009
@@ -42,8 +42,6 @@
Message Receiver::get(qpid::sys::Duration timeout) { return
impl->get(timeout); }
bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return
impl->fetch(message, timeout); }
Message Receiver::fetch(qpid::sys::Duration timeout) { return
impl->fetch(timeout); }
-void Receiver::start() { impl->start(); }
-void Receiver::stop() { impl->stop(); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
uint32_t Receiver::available() { return impl->available(); }
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Mon Nov 16 17:30:23
2009
@@ -41,8 +41,6 @@
virtual Message get(qpid::sys::Duration timeout) = 0;
virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
virtual Message fetch(qpid::sys::Duration timeout) = 0;
- virtual void start() = 0;
- virtual void stop() = 0;
virtual void setCapacity(uint32_t) = 0;
virtual uint32_t getCapacity() = 0;
virtual uint32_t available() = 0;
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=880863&r1=880862&r2=880863&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Nov 16 17:30:23
2009
@@ -270,17 +270,14 @@
sender.send(msg);
Receiver sub1 = fix.session.createReceiver(fix.topic);
sub1.setCapacity(10u);
- sub1.start();
msg.setContent("two");
sender.send(msg);
Receiver sub2 = fix.session.createReceiver(fix.topic);
sub2.setCapacity(10u);
- sub2.start();
msg.setContent("three");
sender.send(msg);
Receiver sub3 = fix.session.createReceiver(fix.topic);
sub3.setCapacity(10u);
- sub3.start();
msg.setContent("four");
sender.send(msg);
BOOST_CHECK_EQUAL(fetch(sub2, 2),
boost::assign::list_of<std::string>("three")("four"));
@@ -304,7 +301,6 @@
for (uint i = 0; i < fix.queues.size(); i++) {
Receiver r = fix.session.createReceiver(fix.queues[i]);
r.setCapacity(10u);
- r.start();//TODO: add Session::start
}
for (uint i = 0; i < fix.queues.size(); i++) {
@@ -394,11 +390,9 @@
Receiver r1 = fix.session.createReceiver(fix.queues[0]);
r1.setCapacity(100);
- r1.start();
Receiver r2 = fix.session.createReceiver(fix.queues[1]);
r2.setCapacity(100);
- r2.start();
Sender s1 = fix.session.createSender(fix.queues[0]);
Sender s2 = fix.session.createSender(fix.queues[1]);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]