User: pkendall
  Date: 01/08/16 18:52:33

  Modified:    src/main/org/jboss/mq SpyMessageConsumer.java
  Log:
  Handle runtime exceptions in message listener code
  
  Revision  Changes    Path
  1.5       +53 -46    jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyMessageConsumer.java   2001/08/17 01:29:34     1.4
  +++ SpyMessageConsumer.java   2001/08/17 01:52:33     1.5
  @@ -29,7 +29,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable {
   
  @@ -308,60 +308,67 @@
        SpyMessage mes = null;
        try{
                outer: while(true){
  -             //get Message
  -             while(mes == null){
  -                     mes = session.connection.receive(subscription,0);
  -                     if(mes == null){
  -                     synchronized(messages){
  -                             waitingForMessage = true;
  -                             while(messages.isEmpty() && !closed){
  -                             try{ messages.wait(); }catch(InterruptedException e){}
  +                     //get Message
  +                     while(mes == null){
  +                             mes = session.connection.receive(subscription,0);
  +                             if(mes == null){
  +                                     synchronized(messages){
  +                                             waitingForMessage = true;
  +                                             while(messages.isEmpty() && !closed){
  +                                                     try{ messages.wait(); 
}catch(InterruptedException e){}
  +                                             }
  +                                             if(closed){
  +                                                     waitingForMessage = false;
  +                                                     break outer;
  +                                             }
  +                                             mes = 
(SpyMessage)messages.removeFirst();
  +                                             waitingForMessage = false;
  +                                     }
                                }
  -                             if(closed){
  -                             waitingForMessage = false;
  -                             break outer;
  +                             mes.session = session;
  +                             if (mes.isOutdated()) {
  +                                     //Drop message (it has expired)
  +                                     mes.doAcknowledge();
  +                                     mes = null;
                                }
  -                             mes = (SpyMessage)messages.removeFirst();
  -                             waitingForMessage = false;
  -                     }
                        }
  -                     mes.session = session;
  -                     if (mes.isOutdated()) {
  -                     //Drop message (it has expired)
  -                     mes.doAcknowledge();
  -                     mes = null;
  -                     }
  -             }
   
  -             MessageListener thisListener;
  -             synchronized(stateLock){
  -                     if(!isListening()){
  -                     //send NACK cause we have closed listener
  -                     if(mes != null){
  -                             session.connection.send( 
mes.getAcknowledgementRequest(false) );
  +                     MessageListener thisListener;
  +                     synchronized(stateLock){
  +                             if(!isListening()){
  +                                     //send NACK cause we have closed listener
  +                                     if(mes != null){
  +                                             session.connection.send( 
mes.getAcknowledgementRequest(false) );
  +                                     }
  +                                     //this thread is about to die, so we will need 
a new one if a new listener is added
  +                                     listenerThread = null;
  +                                     mes = null;
  +                                     break;
  +                             }
  +                             thisListener = messageListener;
                        }
  -                     //this thread is about to die, so we will need a new one if a 
new listener is added
  -                     listenerThread = null;
  -                     mes = null;
  -                     break;
  +                     Message message = mes;
  +                     if( mes instanceof SpyEncapsulatedMessage ) {
  +                             message = ((SpyEncapsulatedMessage)mes).getMessage();
                        }
  -                     thisListener = messageListener;
  -             }
  -             Message message = mes;
  -             if( mes instanceof SpyEncapsulatedMessage ) {
  -                     message = ((SpyEncapsulatedMessage)mes).getMessage();
  -             }
   
  -             if(session.transacted){
  -                     
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
  -             }
  +                     if(session.transacted){
  +                             
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
  +                     }
   
  -             thisListener.onMessage(message);
  +                     //Handle runtime exceptions.  These are handled as per the 
spec if you assume
  +                     //the number of times erroneous messages are redelivered in 
auto_acknowledge mode
  +                     //is 0.   :)
  +                     try{
  +                             thisListener.onMessage(message);
  +                     }catch(RuntimeException e){
  +                             cat.warn("Message listener "+thisListener+" threw a 
RuntimeException.");
  +                     }
   
  -             if (!session.transacted && (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE)) {
  -                     mes.doAcknowledge();
  -             }
  -             mes = null;
  +                     if (!session.transacted && (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE)) {
  +                             mes.doAcknowledge();
  +                     }
  +                     mes = null;
                }
        }catch(JMSException e){
                cat.warn("Message consumer closing due to error in listening 
thread.",e);
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to