User: starksm Date: 01/12/13 14:53:29 Modified: src/main/org/jboss/mq SpyMessageConsumer.java Log: Restore trace level message and switch to Logger. Fix formatting. Revision Changes Path 1.16 +381 -244 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java Index: SpyMessageConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v retrieving revision 1.15 retrieving revision 1.16 diff -u -r1.15 -r1.16 --- SpyMessageConsumer.java 2001/12/13 22:22:09 1.15 +++ SpyMessageConsumer.java 2001/12/13 22:53:29 1.16 @@ -11,11 +11,12 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; - import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; +import org.jboss.logging.Logger; + /** * This class implements <tt>javax.jms.MessageConsumer</tt>. * @@ -23,18 +24,20 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.15 $ + * @version $Revision: 1.16 $ */ public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable { + static Logger log = Logger.getLogger( SpyMessageConsumer.class ); + //Link to my session public SpySession session; // The subscription structure should be fill out by the decendent public Subscription subscription = new Subscription(); //Am I closed ? protected boolean closed; - + protected Object stateLock = new Object(); protected boolean receiving = false; protected boolean waitingForMessage = false; @@ -42,455 +45,581 @@ protected Thread listenerThread = null; //My message listener (null if none) MessageListener messageListener; - + //List of Pending messages (not yet delivered) LinkedList messages; - + //Is this a session consumer? boolean sessionConsumer; - - static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance( SpyMessageConsumer.class ); - + // Constructor --------------------------------------------------- - - SpyMessageConsumer( SpySession s, boolean sessionConsumer ) { + + SpyMessageConsumer( SpySession s, boolean sessionConsumer ) + { session = s; this.sessionConsumer = sessionConsumer; messageListener = null; closed = false; messages = new LinkedList(); } - + public void setMessageListener( MessageListener listener ) - throws JMSException { - - if ( closed ) { + throws JMSException + { + + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - - synchronized ( stateLock ) { - if ( receiving ) { + + synchronized ( stateLock ) + { + if ( receiving ) + { throw new JMSException( "Another thread is already in receive." ); } - + boolean oldListening = listening; listening = ( listener != null ); messageListener = listener; - - if ( !sessionConsumer && listening && !oldListening ) { + + if ( !sessionConsumer && listening && !oldListening ) + { //Start listener thread (if one is not already running) - if ( listenerThread == null ) { + if ( listenerThread == null ) + { listenerThread = new Thread( this, "MessageListenerThread - " + subscription.destination.getName() ); listenerThread.start(); } } } } - + // Public -------------------------------------------------------- - + public String getMessageSelector() - throws JMSException { - if ( closed ) { + throws JMSException + { + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - + return subscription.messageSelector; } - + public MessageListener getMessageListener() - throws JMSException { - if ( closed ) { + throws JMSException + { + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - + return messageListener; } - - public Subscription getSubscription() { + + public Subscription getSubscription() + { return subscription; } - + public Message receive() - throws JMSException { - - if ( closed ) { + throws JMSException + { + + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - - synchronized ( stateLock ) { - if ( receiving ) { + + synchronized ( stateLock ) + { + if ( receiving ) + { throw new JMSException( "Another thread is already in receive." ); } - if ( listening ) { + if ( listening ) + { throw new JMSException( "A message listener is already registered" ); } receiving = true; } - - synchronized ( messages ) { + + synchronized ( messages ) + { SpyMessage msg = session.connection.receive( subscription, 0 ); - if ( msg != null ) { + if ( msg != null ) + { Message mes = preProcessMessage( msg ); - if ( mes != null ) { - synchronized ( stateLock ) { + if ( mes != null ) + { + synchronized ( stateLock ) + { receiving = false; } return mes; } } - - try { + + try + { waitingForMessage = true; - while ( true ) { - if ( closed ) { + while ( true ) + { + if ( closed ) + { return null; } Message mes = getMessage(); - if ( mes != null ) { + if ( mes != null ) + { return mes; } - + if( log.isTraceEnabled() ) + log.trace("receive in messages.wait()"); messages.wait(); } - } catch ( InterruptedException e ) { + } + catch ( InterruptedException e ) + { JMSException newE = new SpyJMSException( "Receive interupted" ); newE.setLinkedException( e ); throw newE; - } finally { + } + finally + { waitingForMessage = false; - synchronized ( stateLock ) { + synchronized ( stateLock ) + { receiving = false; } } } - + } - + public Message receive( long timeOut ) - throws JMSException { - if ( timeOut == 0 ) { + throws JMSException + { + if ( timeOut == 0 ) + { return receive(); } - - if ( closed ) { + + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - - synchronized ( stateLock ) { - if ( receiving ) { + + synchronized ( stateLock ) + { + if ( receiving ) + { throw new JMSException( "Another thread is already in receive." ); } - if ( listening ) { + if ( listening ) + { throw new JMSException( "A message listener is already registered" ); } receiving = true; } - + long endTime = System.currentTimeMillis() + timeOut; - - synchronized ( messages ) { + + synchronized ( messages ) + { SpyMessage msg = session.connection.receive( subscription, timeOut ); - if ( msg != null ) { + if ( msg != null ) + { Message mes = preProcessMessage( msg ); - if ( mes != null ) { - synchronized ( stateLock ) { + if ( mes != null ) + { + synchronized ( stateLock ) + { receiving = false; } return mes; } } - - try { + + try + { waitingForMessage = true; - while ( true ) { - if ( closed ) { + while ( true ) + { + if ( closed ) + { return null; } - + Message mes = getMessage(); - if ( mes != null ) { + if ( mes != null ) + { return mes; } - + long att = endTime - System.currentTimeMillis(); - if ( att <= 0 ) { + if ( att <= 0 ) + { return null; } - + messages.wait( att ); } - - } catch ( InterruptedException e ) { + + } + catch ( InterruptedException e ) + { JMSException newE = new SpyJMSException( "Receive interupted" ); newE.setLinkedException( e ); throw newE; - } finally { + } finally + { waitingForMessage = false; - synchronized ( stateLock ) { + synchronized ( stateLock ) + { receiving = false; } } } - + } - + public Message receiveNoWait() - throws JMSException { - if ( closed ) { + throws JMSException + { + if ( closed ) + { throw new IllegalStateException( "The MessageConsumer is closed" ); } - - synchronized ( stateLock ) { - if ( receiving ) { + + synchronized ( stateLock ) + { + if ( receiving ) + { throw new JMSException( "Another thread is already in receive." ); } - if ( listening ) { + if ( listening ) + { throw new JMSException( "A message listener is already registered" ); } receiving = true; } - + SpyMessage msg = session.connection.receive( subscription, -1 ); - synchronized ( stateLock ) { + synchronized ( stateLock ) + { receiving = false; } - if ( msg == null ) { + if ( msg == null ) + { return null; } return preProcessMessage( msg ); } - + public void close() - throws JMSException { - - cat.debug("Message consumer closing."); + throws JMSException + { + + log.debug("Message consumer closing."); - synchronized ( messages ) { - if ( closed ) { + synchronized ( messages ) + { + if ( closed ) + { return; } - + closed = true; messages.notify(); } - - if ( listenerThread != null && !Thread.currentThread().equals(listenerThread) ) { - try { listenerThread.join(); } catch(InterruptedException e) { } + + if ( listenerThread != null && !Thread.currentThread().equals(listenerThread) ) + { + try + { + listenerThread.join(); + } + catch(InterruptedException e) + { } } - if ( !sessionConsumer ) { + if ( !sessionConsumer ) + { session.removeConsumer( this ); } } - + public void addMessage( SpyMessage message ) - throws JMSException { - synchronized ( messages ) { - - if ( closed ) { - cat.debug( "WARNING: NACK issued. The message consumer was closed." ); + throws JMSException + { + synchronized ( messages ) + { + + if ( closed ) + { + log.debug( "WARNING: NACK issued. The message consumer was closed." ); session.connection.send( message.getAcknowledgementRequest( false ) ); return; } - + //Add a message to the queue -// Consider removing this test (subscription.accepts). I don't think it will ever fail -// because the test is also done by the server before message is even sent. - if ( subscription.accepts( message.header ) ) { - if ( sessionConsumer ) { + // Consider removing this test (subscription.accepts). I don't think it will ever fail + // because the test is also done by the server before message is even sent. + if ( subscription.accepts( message.header ) ) + { + if ( sessionConsumer ) + { sessionConsumerProcessMessage( message ); - } else { - if ( waitingForMessage ) { + } + else + { + if ( waitingForMessage ) + { messages.addLast( message ); messages.notifyAll(); - } else { + } + else + { //unwanted message (due to consumer receive timing out) Nack it. - cat.debug( "WARNING: NACK issued. The message consumer was not waiting for a message." ); + log.debug( "WARNING: NACK issued. The message consumer was not waiting for a message." ); session.connection.send( message.getAcknowledgementRequest( false ) ); } } - } else { - cat.debug( "WARNING: NACK issued. The subscription did not accept the message" ); + } + else + { + log.debug( "WARNING: NACK issued. The subscription did not accept the message" ); session.connection.send( message.getAcknowledgementRequest( false ) ); } } } - + //Used to facilitate delivery of messages to a message listener. - public void run() { + public void run() + { SpyMessage mes = null; - try { + try + { outer : - while ( true ) { - //get Message - while ( mes == null ) { - synchronized ( messages ) { - if ( closed ) { - waitingForMessage = false; - break outer; - } - mes = session.connection.receive( subscription, 0 ); - if ( mes == null ) { - waitingForMessage = true; - while ( messages.isEmpty() && !closed ) { - try { - messages.wait(); - } catch ( InterruptedException e ) { - } - } - if ( closed ) { + while ( true ) + { + //get Message + while ( mes == null ) + { + synchronized ( messages ) + { + if ( closed ) + { waitingForMessage = false; break outer; + } + mes = session.connection.receive( subscription, 0 ); + if ( mes == null ) + { + waitingForMessage = true; + while ( messages.isEmpty() && !closed ) + { + try + { + messages.wait(); + } catch ( InterruptedException e ) + { + } + } + if ( closed ) + { + waitingForMessage = false; + break outer; + } + mes = ( SpyMessage )messages.removeFirst(); + waitingForMessage = false; } - mes = ( SpyMessage )messages.removeFirst(); - waitingForMessage = false; } + mes.session = session; + if ( mes.isOutdated() ) + { + //Drop message (it has expired) + mes.doAcknowledge(); + mes = null; + } + } + + MessageListener thisListener; + synchronized ( stateLock ) + { + if ( !isListening() ) + { + //send NACK cause we have closed listener + if ( mes != null ) + { + session.connection.send( mes.getAcknowledgementRequest( false ) ); + } + //this thread is about to die, so we will need a new one if a new listener is added + listenerThread = null; + mes = null; + break; + } + thisListener = messageListener; + } + Message message = mes; + if ( mes instanceof SpyEncapsulatedMessage ) + { + message = ( ( SpyEncapsulatedMessage )mes ).getMessage(); } - mes.session = session; - if ( mes.isOutdated() ) { - //Drop message (it has expired) + + if ( session.transacted ) + { + session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, mes ); + } + + //Handle runtime exceptions. These are handled as per the spec if you assume + //the number of times erroneous messages are redelivered in auto_acknowledge mode + //is 0. :) + try + { + thisListener.onMessage( message ); + } catch ( RuntimeException e ) + { + log.warn( "Message listener " + thisListener + " threw a RuntimeException." ); + } + + if ( !session.transacted && ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) ) + { mes.doAcknowledge(); - mes = null; } - } - - MessageListener thisListener; - synchronized ( stateLock ) { - if ( !isListening() ) { - //send NACK cause we have closed listener - if ( mes != null ) { - session.connection.send( mes.getAcknowledgementRequest( false ) ); - } - //this thread is about to die, so we will need a new one if a new listener is added - listenerThread = null; - mes = null; - break; - } - thisListener = messageListener; - } - Message message = mes; - if ( mes instanceof SpyEncapsulatedMessage ) { - message = ( ( SpyEncapsulatedMessage )mes ).getMessage(); + mes = null; } - - if ( session.transacted ) { - session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, mes ); - } - - //Handle runtime exceptions. These are handled as per the spec if you assume - //the number of times erroneous messages are redelivered in auto_acknowledge mode - //is 0. :) - try { - thisListener.onMessage( message ); - } catch ( RuntimeException e ) { - cat.warn( "Message listener " + thisListener + " threw a RuntimeException." ); - } - - if ( !session.transacted && ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) ) { - mes.doAcknowledge(); - } - mes = null; - } - } catch ( JMSException e ) { - cat.warn( "Message consumer closing due to error in listening thread.", e ); - try { + } catch ( JMSException e ) + { + log.warn( "Message consumer closing due to error in listening thread.", e ); + try + { close(); - } catch ( Exception ignore ) { + } catch ( Exception ignore ) + { } } } - - public String toString() { + + public String toString() + { return "SpyMessageConsumer:" + subscription.destination; } - - protected boolean isListening() { - synchronized ( stateLock ) { + + protected boolean isListening() + { + synchronized ( stateLock ) + { return listening; } } - + protected void sessionConsumerProcessMessage( SpyMessage message ) - throws JMSException { + throws JMSException + { message.session = session; - if ( message.isOutdated() ) { - cat.debug( "SessionQueue: I dropped a message (timeout)" ); + if ( message.isOutdated() ) + { + log.debug( "I dropped a message (timeout)" ); message.doAcknowledge(); return; } //simply pass on to messageListener (if there is one) MessageListener thisListener; - synchronized ( stateLock ) { + synchronized ( stateLock ) + { thisListener = messageListener; } - - // Add the message to XAResource manager before we call onMessages since the + + // Add the message to XAResource manager before we call onMessages since the // resource may get elisted IN the onMessage method. This gives onMessage a chance to roll the message back. Object anonymousTXID=null; - if ( session.transacted ) { + if ( session.transacted ) + { // Only happens with XA transactions - if( session.currentTransactionId == null ) { + if( session.currentTransactionId == null ) + { anonymousTXID = session.connection.spyXAResourceManager.startTx(); session.currentTransactionId = anonymousTXID; } session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); } - - if ( thisListener != null ) { + + if ( thisListener != null ) + { Message mes = message; - if ( message instanceof SpyEncapsulatedMessage ) { + if ( message instanceof SpyEncapsulatedMessage ) + { mes = ( ( SpyEncapsulatedMessage )message ).getMessage(); } thisListener.onMessage( mes ); } - if (session.transacted) { + if (session.transacted) + { // If we started an anonymous tx - if (anonymousTXID != null) { - if (session.currentTransactionId == anonymousTXID) { + if (anonymousTXID != null) + { + if (session.currentTransactionId == anonymousTXID) + { // This is bad.. We are an XA controled TX but no TM ever elisted us. - // rollback the work and spit an error - try { + // rollback the work and spit an error + try + { session.connection.spyXAResourceManager.endTx(anonymousTXID, true); session.connection.spyXAResourceManager.rollback(anonymousTXID); - } catch (javax.transaction.xa.XAException e) { - cat.error("Could not rollback", e); - } finally { + } catch (javax.transaction.xa.XAException e) + { + log.error("Could not rollback", e); + } finally + { session.currentTransactionId = null; } - throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager"); + throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager"); } } - } else { + } + else + { // Should we Auto-ack the message since the message has now been processesed - if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) { + if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) + { message.doAcknowledge(); } } } - - Message getMessage() { - synchronized ( messages ) { - - while ( true ) { - - try { - if ( messages.size() == 0 ) { + + Message getMessage() + { + synchronized ( messages ) + { + while ( true ) + { + + try + { + if ( messages.size() == 0 ) + { return null; } - + SpyMessage mes = ( SpyMessage )messages.removeFirst(); - + Message rc = preProcessMessage( mes ); // could happen if the message has expired. - if ( rc == null ) { + if ( rc == null ) + { continue; } - + return rc; - } catch ( Exception e ) { + } + catch ( Exception e ) + { e.printStackTrace(); } } @@ -498,30 +627,38 @@ } Message preProcessMessage( SpyMessage message ) - throws JMSException { + throws JMSException + { message.session = session; - + // Has the message expired? - if ( message.isOutdated() ) { - cat.debug( "SessionQueue: I dropped a message (timeout)" ); + if ( message.isOutdated() ) + { + log.debug( "I dropped a message (timeout)" ); message.doAcknowledge(); return null; } - + // Should we try to ack before the message is processed? - if ( !isListening() ) { - - if ( session.transacted ) { + if ( !isListening() ) + { + + if ( session.transacted ) + { session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); - } else if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) { + } else if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) + { message.doAcknowledge(); } - - if ( message instanceof SpyEncapsulatedMessage ) { + + if ( message instanceof SpyEncapsulatedMessage ) + { return ( ( SpyEncapsulatedMessage )message ).getMessage(); } return message; - } else { + } + else + { return message; } }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development