User: dmaplesden
Date: 01/09/19 20:54:42
Modified: src/main/org/jboss/mq SpySession.java
SpyMessageConsumer.java Connection.java
Log:
fixed npe when a receive occurs after a subscription is closed. Also tidied up
closing down of sessions with receivers with message listeners.
Revision Changes Path
1.4 +38 -36 jbossmq/src/main/org/jboss/mq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpySession.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpySession.java 2001/08/17 03:04:01 1.3
+++ SpySession.java 2001/09/20 03:54:41 1.4
@@ -33,7 +33,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public abstract class SpySession
implements Session, XASession {
@@ -225,38 +225,40 @@
throws JMSException {
// allow other threads to process before closing this session
// Patch submitted by John Ellis (10/29/00)
- Thread.yield();
+// Thread.yield();
+ cat.debug("Session closing.");
- //deal with any unacked messages
- if ( !closed && transacted && spyXAResource == null ) {
- rollback();
- }
-
synchronized ( runLock ) {
+
if ( closed ) {
return;
}
- closed = true;
- }
-
- Iterator i;
- synchronized ( consumers ) {
- //notify the sleeping synchronous listeners
- if ( sessionConsumer != null ) {
- sessionConsumer.close();
+ Iterator i;
+ synchronized ( consumers ) {
+
+ //notify the sleeping synchronous listeners
+ if ( sessionConsumer != null ) {
+ sessionConsumer.close();
+ }
+
+ i = consumers.iterator();
}
-
- i = consumers.iterator();
- }
+
+ while ( i.hasNext() ) {
+ SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next();
+ messageConsumer.close();
+ }
+
+ //deal with any unacked messages
+ if ( transacted && spyXAResource == null ) {
+ rollback();
+ }
+
+ connection.sessionClosing( this );
- while ( i.hasNext() ) {
- SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next();
- messageConsumer.close();
+ closed = true;
}
-
- connection.sessionClosing( this );
-
}
@@ -298,21 +300,21 @@
//Rollback a transacted session
public synchronized void rollback()
throws JMSException {
- if ( spyXAResource != null ) {
- throw new javax.jms.TransactionInProgressException( "Should not be call
from a XASession" );
- }
- if ( closed ) {
- throw new IllegalStateException( "The session is closed" );
- }
- if ( !transacted ) {
- throw new IllegalStateException( "The session is not transacted" );
- }
- cat.debug( "Session: rollback()" );
-
- // Stop message delivery
synchronized ( runLock ) {
+ if ( spyXAResource != null ) {
+ throw new javax.jms.TransactionInProgressException( "Should not be call
from a XASession" );
+ }
+ if ( closed ) {
+ throw new IllegalStateException( "The session is closed" );
+ }
+ if ( !transacted ) {
+ throw new IllegalStateException( "The session is not transacted" );
+ }
+
+ cat.debug( "Session: rollback()" );
+
// rollback transaction
try {
connection.spyXAResourceManager.endTx( currentTransactionId, true );
1.8 +14 -2 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyMessageConsumer.java 2001/08/21 23:16:17 1.7
+++ SpyMessageConsumer.java 2001/09/20 03:54:41 1.8
@@ -23,7 +23,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyMessageConsumer
implements MessageConsumer, SpyConsumer, Runnable
@@ -264,6 +264,8 @@
public void close()
throws JMSException {
+ cat.debug("Message consumer closing.");
+
synchronized ( messages ) {
if ( closed ) {
return;
@@ -273,6 +275,10 @@
messages.notify();
}
+ if ( listenerThread != null && !Thread.currentThread().equals(listenerThread)
) {
+ try { listenerThread.join(); } catch(InterruptedException e) { }
+ }
+
if ( !sessionConsumer ) {
session.removeConsumer( this );
}
@@ -316,7 +322,13 @@
while ( true ) {
//get Message
while ( mes == null ) {
- mes = session.connection.receive( subscription, 0 );
+ synchronized ( messages ) {
+ if ( closed ) {
+ waitingForMessage = false;
+ break outer;
+ }
+ mes = session.connection.receive( subscription, 0 );
+ }
if ( mes == null ) {
synchronized ( messages ) {
waitingForMessage = true;
1.5 +3 -3 jbossmq/src/main/org/jboss/mq/Connection.java
Index: Connection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/Connection.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- Connection.java 2001/08/28 21:38:25 1.4
+++ Connection.java 2001/09/20 03:54:41 1.5
@@ -36,7 +36,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class Connection implements java.io.Serializable, javax.jms.Connection {
//Maps a destination to a LinkedList of Subscriptions
@@ -329,13 +329,13 @@
for ( int i = 0; i < requests.length; i++ ) {
SpyConsumer consumer = ( SpyConsumer )subscriptions.get(
requests[i].subscriptionId );
+
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
+
if ( consumer == null ) {
send( requests[i].message.getAcknowledgementRequest( false ) );
cat.debug( "WARNING: NACK issued due to non existent subscription" );
continue;
}
-
-
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
consumer.addMessage( requests[i].message );
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development