User: pkendall
Date: 01/08/16 18:52:33
Modified: src/main/org/jboss/mq SpyMessageConsumer.java
Log:
Handle runtime exceptions in message listener code
Revision Changes Path
1.5 +53 -46 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyMessageConsumer.java 2001/08/17 01:29:34 1.4
+++ SpyMessageConsumer.java 2001/08/17 01:52:33 1.5
@@ -29,7 +29,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable {
@@ -308,60 +308,67 @@
SpyMessage mes = null;
try{
outer: while(true){
- //get Message
- while(mes == null){
- mes = session.connection.receive(subscription,0);
- if(mes == null){
- synchronized(messages){
- waitingForMessage = true;
- while(messages.isEmpty() && !closed){
- try{ messages.wait(); }catch(InterruptedException e){}
+ //get Message
+ while(mes == null){
+ mes = session.connection.receive(subscription,0);
+ if(mes == null){
+ synchronized(messages){
+ waitingForMessage = true;
+ while(messages.isEmpty() && !closed){
+ try{ messages.wait();
}catch(InterruptedException e){}
+ }
+ if(closed){
+ waitingForMessage = false;
+ break outer;
+ }
+ mes =
(SpyMessage)messages.removeFirst();
+ waitingForMessage = false;
+ }
}
- if(closed){
- waitingForMessage = false;
- break outer;
+ mes.session = session;
+ if (mes.isOutdated()) {
+ //Drop message (it has expired)
+ mes.doAcknowledge();
+ mes = null;
}
- mes = (SpyMessage)messages.removeFirst();
- waitingForMessage = false;
- }
}
- mes.session = session;
- if (mes.isOutdated()) {
- //Drop message (it has expired)
- mes.doAcknowledge();
- mes = null;
- }
- }
- MessageListener thisListener;
- synchronized(stateLock){
- if(!isListening()){
- //send NACK cause we have closed listener
- if(mes != null){
- session.connection.send(
mes.getAcknowledgementRequest(false) );
+ MessageListener thisListener;
+ synchronized(stateLock){
+ if(!isListening()){
+ //send NACK cause we have closed listener
+ if(mes != null){
+ session.connection.send(
mes.getAcknowledgementRequest(false) );
+ }
+ //this thread is about to die, so we will need
a new one if a new listener is added
+ listenerThread = null;
+ mes = null;
+ break;
+ }
+ thisListener = messageListener;
}
- //this thread is about to die, so we will need a new one if a
new listener is added
- listenerThread = null;
- mes = null;
- break;
+ Message message = mes;
+ if( mes instanceof SpyEncapsulatedMessage ) {
+ message = ((SpyEncapsulatedMessage)mes).getMessage();
}
- thisListener = messageListener;
- }
- Message message = mes;
- if( mes instanceof SpyEncapsulatedMessage ) {
- message = ((SpyEncapsulatedMessage)mes).getMessage();
- }
- if(session.transacted){
-
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
- }
+ if(session.transacted){
+
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
+ }
- thisListener.onMessage(message);
+ //Handle runtime exceptions. These are handled as per the
spec if you assume
+ //the number of times erroneous messages are redelivered in
auto_acknowledge mode
+ //is 0. :)
+ try{
+ thisListener.onMessage(message);
+ }catch(RuntimeException e){
+ cat.warn("Message listener "+thisListener+" threw a
RuntimeException.");
+ }
- if (!session.transacted && (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE)) {
- mes.doAcknowledge();
- }
- mes = null;
+ if (!session.transacted && (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE)) {
+ mes.doAcknowledge();
+ }
+ mes = null;
}
}catch(JMSException e){
cat.warn("Message consumer closing due to error in listening
thread.",e);
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development