Author: chirino
Date: Thu Dec 22 07:43:42 2005
New Revision: 358579
URL: http://svn.apache.org/viewcvs?rev=358579&view=rev
Log:
Fixed and added test cases for the consumer start() stop() methods.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java?rev=358579&r1=358578&r2=358579&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java
Thu Dec 22 07:43:42 2005
@@ -245,10 +245,7 @@
* @return true if this is a durable topic subscriber
*/
public boolean isDurableSubscriber() {
- // TODO Add ActiveMQTopicSubscriber
- return false;
- // return this instanceof ActiveMQTopicSubscriber && consumerName !=
- // null && consumerName.length() > 0;
+ return info.getSubcriptionName()!=null &&
info.getDestination().isTopic();
}
/**
@@ -671,8 +668,12 @@
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
- if (started.get())
- unconsumedMessages.start();
+ try {
+ if (started.get())
+ start();
+ } catch (JMSException e) {
+ session.connection.onAsyncException(e);
+ }
}
}, redeliveryDelay);
@@ -695,7 +696,7 @@
MessageListener listener = this.messageListener;
try {
if (!unconsumedMessages.isClosed()) {
- if (listener != null) {
+ if (listener != null && started.get()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
listener.onMessage(message);
@@ -716,9 +717,19 @@
return unconsumedMessages.size();
}
- public void start() {
+ public void start() throws JMSException {
started.set(true);
unconsumedMessages.start();
+ MessageListener listener = this.messageListener;
+ if( listener!=null ) {
+ MessageDispatch md;
+ while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ beforeMessageIsConsumed(md);
+ listener.onMessage(message);
+ afterMessageIsConsumed(md, false);
+ }
+ }
}
public void stop() {
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java?rev=358579&r1=358578&r2=358579&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java
Thu Dec 22 07:43:42 2005
@@ -59,6 +59,55 @@
public byte destinationType;
public boolean durableConsumer;
+ public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+ addCombinationValues("deliveryMode", new Object[] {
+ new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT) });
+ addCombinationValues("destinationType", new Object[] {
+ new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE),
+ new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) });
+ }
+ public void testMessageListenerWithConsumerCanBeStopped() throws Throwable
{
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done1 = new CountDownLatch(1);
+ final CountDownLatch done2 = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if( counter.get()==1 )
+ done1.countDown();
+ if( counter.get()==2 )
+ done2.countDown();
+ }
+ });
+
+ // Send a first message to make sure that the consumer dispatcher is
running
+ sendMessages(session, destination, 1);
+ assertTrue(done1.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Stop the consumer.
+ consumer.stop();
+
+ // Send a message, but should not get delivered.
+ sendMessages(session, destination, 1);
+ assertFalse(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Start the consumer, and the message should now get delivered.
+ consumer.start();
+ assertTrue(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(2, counter.get());
+ }
public void initCombosForTestMutiReceiveWithPrefetch1() {
addCombinationValues("deliveryMode", new Object[] {