Author: chirino
Date: Fri Feb 10 09:06:46 2006
New Revision: 376766
URL: http://svn.apache.org/viewcvs?rev=376766&view=rev
Log:
Fixed failing ChangeSessionDeliveryModeTest test in the assembly module.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=376766&r1=376765&r2=376766&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Feb 10 09:06:46 2006
@@ -528,10 +528,8 @@
}
}
- protected void checkMessageListener() throws IllegalStateException {
- if (messageListener != null) {
- throw new IllegalStateException("Cannot synchronously receive a
message when a MessageListener is set");
- }
+ protected void checkMessageListener() throws JMSException {
+ session.checkMessageListener();
}
private void beforeMessageIsConsumed(MessageDispatch md) {
@@ -713,8 +711,17 @@
if (listener != null && started.get()) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
- listener.onMessage(message);
- afterMessageIsConsumed(md, false);
+ try {
+ listener.onMessage(message);
+ afterMessageIsConsumed(md, false);
+ } catch (RuntimeException e) {
+ if ( session.isDupsOkAcknowledge() ||
session.isAutoAcknowledge() ) {
+ // Redeliver the message
+ } else {
+ // Transacted or Client ack: Deliver the next
message.
+ afterMessageIsConsumed(md, false);
+ }
+ }
} else {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=376766&r1=376765&r2=376766&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Feb 10 09:06:46 2006
@@ -1666,5 +1666,16 @@
return "ActiveMQSession
{id="+info.getSessionId()+",started="+started.get()+"}";
}
+ public void checkMessageListener() throws JMSException {
+ if (messageListener != null) {
+ throw new IllegalStateException("Cannot synchronously receive a
message when a MessageListener is set");
+ }
+ for (Iterator i = consumers.iterator(); i.hasNext();) {
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
i.next();
+ if( consumer.getMessageListener()!=null ) {
+ throw new IllegalStateException("Cannot synchronously receive
a message when a MessageListener is set");
+ }
+ }
+ }
}