User: dmaplesden
  Date: 01/09/19 20:54:42

  Modified:    src/main/org/jboss/mq SpySession.java
                        SpyMessageConsumer.java Connection.java
  Log:
  fixed npe when a receive occurs after a subscription is closed.  Also tidied up 
closing down of sessions with receivers with message listeners.
  
  Revision  Changes    Path
  1.4       +38 -36    jbossmq/src/main/org/jboss/mq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpySession.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpySession.java   2001/08/17 03:04:01     1.3
  +++ SpySession.java   2001/09/20 03:54:41     1.4
  @@ -33,7 +33,7 @@
    * @author     Norbert Lataille ([EMAIL PROTECTED])
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.3 $
  + * @version    $Revision: 1.4 $
    */
   public abstract class SpySession
          implements Session, XASession {
  @@ -225,38 +225,40 @@
         throws JMSException {
         // allow other threads to process before closing this session
         // Patch submitted by John Ellis (10/29/00)
  -      Thread.yield();
  +//      Thread.yield();
  +      cat.debug("Session closing.");
   
  -      //deal with any unacked messages
  -      if ( !closed && transacted && spyXAResource == null ) {
  -         rollback();
  -      }
  -
         synchronized ( runLock ) {
  +
            if ( closed ) {
               return;
            }
  -         closed = true;
  -      }
  -
  -      Iterator i;
  -      synchronized ( consumers ) {
   
  -         //notify the sleeping synchronous listeners
  -         if ( sessionConsumer != null ) {
  -            sessionConsumer.close();
  +         Iterator i;
  +         synchronized ( consumers ) {
  +   
  +            //notify the sleeping synchronous listeners
  +            if ( sessionConsumer != null ) {
  +               sessionConsumer.close();
  +            }
  +   
  +            i = consumers.iterator();
            }
  -
  -         i = consumers.iterator();
  -      }
  +   
  +         while ( i.hasNext() ) {
  +            SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next();
  +            messageConsumer.close();
  +         }
  +   
  +         //deal with any unacked messages
  +         if ( transacted && spyXAResource == null ) {
  +            rollback();
  +         }
  +      
  +         connection.sessionClosing( this );
   
  -      while ( i.hasNext() ) {
  -         SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next();
  -         messageConsumer.close();
  +         closed = true;
         }
  -
  -      connection.sessionClosing( this );
  -
      }
   
   
  @@ -298,21 +300,21 @@
      //Rollback a transacted session
      public synchronized void rollback()
         throws JMSException {
  -      if ( spyXAResource != null ) {
  -         throw new javax.jms.TransactionInProgressException( "Should not be call 
from a XASession" );
  -      }
  -      if ( closed ) {
  -         throw new IllegalStateException( "The session is closed" );
  -      }
  -      if ( !transacted ) {
  -         throw new IllegalStateException( "The session is not transacted" );
  -      }
   
  -      cat.debug( "Session: rollback()" );
  -
  -      // Stop message delivery
         synchronized ( runLock ) {
   
  +         if ( spyXAResource != null ) {
  +            throw new javax.jms.TransactionInProgressException( "Should not be call 
from a XASession" );
  +         }
  +         if ( closed ) {
  +            throw new IllegalStateException( "The session is closed" );
  +         }
  +         if ( !transacted ) {
  +            throw new IllegalStateException( "The session is not transacted" );
  +         }
  +   
  +         cat.debug( "Session: rollback()" );
  +   
            // rollback transaction
            try {
               connection.spyXAResourceManager.endTx( currentTransactionId, true );
  
  
  
  1.8       +14 -2     jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyMessageConsumer.java   2001/08/21 23:16:17     1.7
  +++ SpyMessageConsumer.java   2001/09/20 03:54:41     1.8
  @@ -23,7 +23,7 @@
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.7 $
  + * @version    $Revision: 1.8 $
    */
   public class SpyMessageConsumer
      implements MessageConsumer, SpyConsumer, Runnable
  @@ -264,6 +264,8 @@
      public void close()
         throws JMSException {
   
  +      cat.debug("Message consumer closing.");
  +      
         synchronized ( messages ) {
            if ( closed ) {
               return;
  @@ -273,6 +275,10 @@
            messages.notify();
         }
   
  +      if ( listenerThread != null && !Thread.currentThread().equals(listenerThread) 
) {
  +         try { listenerThread.join(); } catch(InterruptedException e) { }
  +      }
  +      
         if ( !sessionConsumer ) {
            session.removeConsumer( this );
         }
  @@ -316,7 +322,13 @@
            while ( true ) {
               //get Message
               while ( mes == null ) {
  -               mes = session.connection.receive( subscription, 0 );
  +               synchronized ( messages ) {
  +                  if ( closed ) {
  +                     waitingForMessage = false;
  +                     break outer;
  +                  }  
  +                  mes = session.connection.receive( subscription, 0 );
  +               }
                  if ( mes == null ) {
                     synchronized ( messages ) {
                        waitingForMessage = true;
  
  
  
  1.5       +3 -3      jbossmq/src/main/org/jboss/mq/Connection.java
  
  Index: Connection.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/Connection.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Connection.java   2001/08/28 21:38:25     1.4
  +++ Connection.java   2001/09/20 03:54:41     1.5
  @@ -36,7 +36,7 @@
    * @author     Norbert Lataille ([EMAIL PROTECTED])
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.4 $
  + * @version    $Revision: 1.5 $
    */
   public class Connection implements java.io.Serializable, javax.jms.Connection {
      //Maps a destination to a LinkedList of Subscriptions
  @@ -329,13 +329,13 @@
            for ( int i = 0; i < requests.length; i++ ) {
   
               SpyConsumer consumer = ( SpyConsumer )subscriptions.get( 
requests[i].subscriptionId );
  +            
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
  +
               if ( consumer == null ) {
                  send( requests[i].message.getAcknowledgementRequest( false ) );
                  cat.debug( "WARNING: NACK issued due to non existent subscription" );
                  continue;
               }
  -
  -            
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
   
               consumer.addMessage( requests[i].message );
            }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to