User: chirino 
  Date: 01/09/25 22:02:27

  Modified:    src/main/org/jboss/mq Connection.java
  Log:
  Several modification to support the client connection sending ping messages
  to the server and the server sending back pong messages.  These messages
  are used to determine if the connection has gone down.
  
  This fixes the bug with the ExceptionListner not being notified of
  the server failing when the client is only receiving messages.
  
  Revision  Changes    Path
  1.6       +717 -311  jbossmq/src/main/org/jboss/mq/Connection.java
  
  Index: Connection.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/Connection.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- Connection.java   2001/09/20 03:54:41     1.5
  +++ Connection.java   2001/09/26 05:02:27     1.6
  @@ -1,10 +1,11 @@
   /*
  - * JBossMQ, the OpenSource JMS implementation
  + * JBoss, the OpenSource J2EE webOS
    *
    * Distributable under LGPL license.
    * See terms of license at gnu.org.
    */
   package org.jboss.mq;
  +import EDU.oswego.cs.dl.util.concurrent.Semaphore;
   
   import java.io.File;
   import java.io.FileInputStream;
  @@ -31,66 +32,114 @@
   import org.jboss.mq.il.ServerIL;
   
   /**
  - *  This class implements javax.jms.Connection
  + * This class implements javax.jms.Connection
    *
  - * @author     Norbert Lataille ([EMAIL PROTECTED])
  - * @author     Hiram Chirino ([EMAIL PROTECTED])
  - * @created    August 16, 2001
  - * @version    $Revision: 1.5 $
  + * @author    Norbert Lataille ([EMAIL PROTECTED])
  + * @author    Hiram Chirino ([EMAIL PROTECTED])
  + * @version   $Revision: 1.6 $
  + * @created   August 16, 2001
    */
  -public class Connection implements java.io.Serializable, javax.jms.Connection {
  +public class Connection implements java.io.Serializable, javax.jms.Connection
  +{
  +   /**
  +    * Description of the Field
  +    */
  +   public static ThreadGroup threadGroup = new ThreadGroup("JBossMQ Client 
Threads");
  +   /**
  +    * Description of the Field
  +    */
  +   protected final static long PING_INTERVAL = 1000 * 60;
  +   static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(Connection.class);
      //Maps a destination to a LinkedList of Subscriptions
  -   public HashMap   destinationSubscriptions = new HashMap();
  +   /**
  +    * Description of the Field
  +    */
  +   public HashMap destinationSubscriptions = new HashMap();
      //Maps a a subsction id to a Subscription
  -   public HashMap   subscriptions = new HashMap();
  +   /**
  +    * Description of the Field
  +    */
  +   public HashMap subscriptions = new HashMap();
      //Is the connection stopped ?
  -   public boolean   modeStop;
  +   /**
  +    * Description of the Field
  +    */
  +   public boolean modeStop;
      //////////////////////////////////////////////////////////////
      // Attributes
      //////////////////////////////////////////////////////////////
   
      // This is our connection to the JMS server
  +   /**
  +    * Description of the Field
  +    */
      protected ServerIL serverIL;
      //This is the clientID
  +   /**
  +    * Description of the Field
  +    */
      protected String clientID;
      // The connection token is used to identify our connection
      // to the server.
  +   /**
  +    * Description of the Field
  +    */
      protected ConnectionToken connectionToken;
   
      // The object that sets up the client IL
  +   /**
  +    * Description of the Field
  +    */
      protected ClientILService clientILService;
   
  +   // The thread that pings the connection to see if it is 'alive'
  +   /**
  +    * Description of the Field
  +    */
  +   protected Thread pingThread;
  +   /**
  +    * Description of the Field
  +    */
  +   protected boolean ponged;
  +   /**
  +    * Description of the Field
  +    */
  +   protected Semaphore pingTaskSemaphore = new Semaphore(0);
  +   /**
  +    * Description of the Field
  +    */
  +   protected volatile boolean closing = false;
  +
      //LinkedList of all created sessions by this connection
  -   HashSet          createdSessions;
  +   HashSet createdSessions;
      // Numbers subscriptions
  -   int              subscriptionCounter = Integer.MIN_VALUE;
  +   int subscriptionCounter = Integer.MIN_VALUE;
      //Is the connection closed ?
  -   boolean          closed;
  +   boolean closed;
      // Used to control tranactions
      SpyXAResourceManager spyXAResourceManager;
   
      //The class that created this connection
      GenericConnectionFactory genericConnectionFactory;
      //Last message ID returned
  -   private int      lastMessageID;
  +   private int lastMessageID;
   
      //the exceptionListener
      private ExceptionListener exceptionListener;
   
      //Get a new messageID (creation of a new message)
      private StringBuffer sb = new StringBuffer();
  -   private char[]   charStack = new char[22];
  -   public static ThreadGroup threadGroup = new ThreadGroup( "JBossMQ Client 
Threads" );
  -   static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance( 
Connection.class );
  +   private char[] charStack = new char[22];
   
      //////////////////////////////////////////////////////////////
      // Constructors
      //////////////////////////////////////////////////////////////
   
  -   Connection( String userName, String password, GenericConnectionFactory 
genericConnectionFactory )
  -      throws JMSException {
  +   Connection(String userName, String password, GenericConnectionFactory 
genericConnectionFactory)
  +          throws JMSException
  +   {
   
  -      cat.debug( "Connection Initializing" );
  +      cat.debug("Connection Initializing");
   
         //Set the attributes
         createdSessions = new HashSet();
  @@ -100,29 +149,43 @@
         modeStop = true;
   
         // Connect to the server
  -      cat.debug( "Getting the serverIL" );
  +      cat.debug("Getting the serverIL");
         this.genericConnectionFactory = genericConnectionFactory;
         serverIL = genericConnectionFactory.createServerIL();
   
  -      cat.debug( "Authenticating" );
  +      cat.debug("Authenticating");
         // Authenticate with the server
  -      if ( userName != null ) {
  -         askForAnID( userName, password );
  +      if (userName != null)
  +      {
  +         askForAnID(userName, password);
         }
  -      if ( clientID == null ) {
  +      if (clientID == null)
  +      {
            askForAnID();
         }
   
         // Setup the ClientIL service so that the Server can
         // push messages to us
  -      cat.debug( "Starting the clientIL service" );
  +      cat.debug("Starting the clientIL service");
         startILService();
   
         // Setup the XA Resource manager,
  -      spyXAResourceManager = new SpyXAResourceManager( this );
  +      spyXAResourceManager = new SpyXAResourceManager(this);
   
  -      // Initialize the thread pool
  -      cat.debug( "Connection establishment successful" );
  +      // Start the pingThread...
  +      pingThread = new Thread(threadGroup, new PingTask(), "Connection Monitor");
  +      pingThread.start();
  +      // Wait for the ping thread to start..
  +      try
  +      {
  +         pingTaskSemaphore.acquire();
  +      }
  +      catch (InterruptedException e)
  +      {
  +         Thread.currentThread().interrupt();
  +      }
  +
  +      cat.debug("Connection establishment successful");
   
      }
   
  @@ -130,37 +193,60 @@
      // Constructors
      //////////////////////////////////////////////////////////////
   
  -   Connection( GenericConnectionFactory genericConnectionFactory )
  -      throws JMSException {
  -      this( null, null, genericConnectionFactory );
  +   Connection(GenericConnectionFactory genericConnectionFactory)
  +          throws JMSException
  +   {
  +      this(null, null, genericConnectionFactory);
      }
   
  -   public void setClientID( String cID )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  -      }
  -      if ( clientID != null ) {
  -         throw new IllegalStateException( "The connection has already a clientID" );
  +   /**
  +    * Sets the ClientID attribute of the Connection object
  +    *
  +    * @param cID               The new ClientID value
  +    * @exception JMSException  Description of Exception
  +    */
  +   public void setClientID(String cID)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
  +      if (clientID != null)
  +      {
  +         throw new IllegalStateException("The connection has already a clientID");
  +      }
  +
  +      cat.debug("SetClientID(" + clientID + ")");
  +
  +      try
  +      {
  +         serverIL.checkID(cID);
         }
  -
  -      cat.debug( "SetClientID(" + clientID + ")" );
  -
  -      try {
  -         serverIL.checkID( cID );
  -      } catch ( JMSException e ) {
  +      catch (JMSException e)
  +      {
            throw e;
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot connect to the JMSServer", e );
         }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot connect to the JMSServer", e);
  +      }
   
         clientID = cID;
      }
   
  -   public void setExceptionListener( ExceptionListener listener )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   /**
  +    * Sets the ExceptionListener attribute of the Connection object
  +    *
  +    * @param listener          The new ExceptionListener value
  +    * @exception JMSException  Description of Exception
  +    */
  +   public void setExceptionListener(ExceptionListener listener)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
         exceptionListener = listener;
  @@ -170,103 +256,224 @@
      // Public Methods
      //////////////////////////////////////////////////////////////
   
  +   /**
  +    * Gets the ClientID attribute of the Connection object
  +    *
  +    * @return                  The ClientID value
  +    * @exception JMSException  Description of Exception
  +    */
      public String getClientID()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
         return clientID;
      }
   
  -   public ConnectionMetaData getMetaData()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   /**
  +    * Gets the ExceptionListener attribute of the Connection object
  +    *
  +    * @return                  The ExceptionListener value
  +    * @exception JMSException  Description of Exception
  +    */
  +   public ExceptionListener getExceptionListener()
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      return new SpyConnectionMetaData();
  +      return exceptionListener;
      }
   
  -   public ExceptionListener getExceptionListener()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   /**
  +    * Gets the MetaData attribute of the Connection object
  +    *
  +    * @return                  The MetaData value
  +    * @exception JMSException  Description of Exception
  +    */
  +   public ConnectionMetaData getMetaData()
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      return exceptionListener;
  +      return new SpyConnectionMetaData();
      }
   
  -   public ServerIL getServerIL() {
  +   /**
  +    * Gets the ServerIL attribute of the Connection object
  +    *
  +    * @return   The ServerIL value
  +    */
  +   public ServerIL getServerIL()
  +   {
         return serverIL;
      }
   
  -   public void start()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  -      }
  +   /**
  +    * #Description of the Method
  +    */
  +   public void asynchClose()
  +   {
  +   }
   
  -      if ( !modeStop ) {
  -         return;
  +   //called by a TemporaryDestination which is going to be deleted()
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param dest  Description of Parameter
  +    */
  +   public void asynchDeleteTemporaryDestination(SpyDestination dest)
  +   {
  +      try
  +      {
  +         deleteTemporaryDestination(dest);
  +      }
  +      catch (JMSException e)
  +      {
  +         asynchFailure(e.getMessage(), e.getLinkedException());
         }
  -      modeStop = false;
  +
  +   }
   
  -      cat.debug( "Starting connection, ClientID=" + connectionToken.getClientID() );
  +   //Gets the first consumer that is listening to a destination.
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param requests  Description of Parameter
  +    */
  +   public void asynchDeliver(ReceiveRequest requests[])
  +   {
   
  -      try {
  -         serverIL.setEnabled( connectionToken, true );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot enable the connection with the JMS 
server", e );
  +      try
  +      {
  +         for (int i = 0; i < requests.length; i++)
  +         {
  +
  +            SpyConsumer consumer = 
(SpyConsumer)subscriptions.get(requests[i].subscriptionId);
  +            
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
  +
  +            if (consumer == null)
  +            {
  +               send(requests[i].message.getAcknowledgementRequest(false));
  +               cat.debug("WARNING: NACK issued due to non existent subscription");
  +               continue;
  +            }
  +
  +            consumer.addMessage(requests[i].message);
  +         }
         }
  +      catch (JMSException e)
  +      {
  +         asynchFailure(e.getMessage(), e.getLinkedException());
  +      }
      }
   
  -   public void stop()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  -      }
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param reason  Description of Parameter
  +    * @param e       Description of Parameter
  +    */
  +   public void asynchFailure(String reason, Exception e)
  +   {
   
  -      if ( modeStop ) {
  +      // Exceptions due to closing will be ignored.
  +      if (closing)
  +      {
            return;
         }
  -      modeStop = true;
   
  -      cat.debug( "Stoping connection, ClientID=" + connectionToken.getClientID() );
  +      JMSException excep = new SpyJMSException(reason, e);
  +      excep.fillInStackTrace();
   
  -      try {
  -         serverIL.setEnabled( connectionToken, false );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot disable the connection with the JMS 
server", e );
  +      if (exceptionListener != null)
  +      {
  +         synchronized (exceptionListener)
  +         {
  +            exceptionListener.onException(excep);
  +         }
         }
  +      else
  +      {
  +         cat.warn("Connection failure: ", excep);
  +      }
  +   }
  +
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param serverTime  Description of Parameter
  +    */
  +   public void asynchPong(long serverTime)
  +   {
  +      cat.debug("PONG");
  +      ponged = true;
      }
   
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
      public synchronized void close()
  -      throws JMSException {
  -      if ( closed ) {
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
            return;
         }
  +
  +      cat.debug("Closing sessions, ClientID=" + connectionToken.getClientID());
  +      closing = true;
   
  -      cat.debug( "Closing sessions, ClientID=" + connectionToken.getClientID() );
  +      // Stop the ping thread
  +      pingThread.interrupt();
  +
         //notify his sessions
  -      synchronized ( createdSessions ) {
  +      synchronized (createdSessions)
  +      {
   
            Object[] vect = createdSessions.toArray();
  -         for ( int i = 0; i < vect.length; i++ ) {
  -            ( ( SpySession )vect[i] ).close();
  +         for (int i = 0; i < vect.length; i++)
  +         {
  +            ((SpySession)vect[i]).close();
            }
   
         }
  -      cat.debug( "Closed sessions" );
  +      cat.debug("Closed sessions");
   
  -      cat.debug( "Disconnecting from server" );
  +      cat.debug("Notifiying the server of close");
         //Notify the JMSServer that I am closing
  -      try {
  -         serverIL.connectionClosing( connectionToken );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot close properly the connection", e );
  +      try
  +      {
  +         serverIL.connectionClosing(connectionToken);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot close properly the connection", e);
  +      }
  +
  +      cat.debug("Waiting for ping thread to stop");
  +      // Try to wait for the ping thread to stop.
  +      try
  +      {
  +         pingTaskSemaphore.attempt(1000 * 10);
  +      }
  +      catch (InterruptedException e)
  +      {
  +         Thread.currentThread().interrupt();
         }
  +
  +      cat.debug("Stoping the ClientIL service");
         stopILService();
  -      cat.debug( "Disconnected from server" );
  +      cat.debug("Disconnected from server");
   
         // Only set the closed flag after all the objects that depend
         // on this connection have been closed.
  @@ -274,140 +481,206 @@
      }
   
      //called by a TemporaryDestination which is going to be deleted()
  -   public void deleteTemporaryDestination( SpyDestination dest )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param dest              Description of Parameter
  +    * @exception JMSException  Description of Exception
  +    */
  +   public void deleteTemporaryDestination(SpyDestination dest)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      cat.debug( "SpyConnection: deleteDestination(dest=" + dest.toString() + ")" );
  +      cat.debug("SpyConnection: deleteDestination(dest=" + dest.toString() + ")");
   
  -      try {
  +      try
  +      {
   
            //Remove it from the destinations list
  -         synchronized ( subscriptions ) {
  -            destinationSubscriptions.remove( dest );
  +         synchronized (subscriptions)
  +         {
  +            destinationSubscriptions.remove(dest);
            }
   
            //Notify its sessions that this TemporaryDestination is going to be 
deleted()
            //We could do that only on the Sessions "linked" to this Destination
  -         synchronized ( createdSessions ) {
  +         synchronized (createdSessions)
  +         {
   
               Iterator i = createdSessions.iterator();
  -            while ( i.hasNext() ) {
  -               ( ( SpySession )i.next() ).deleteTemporaryDestination( dest );
  +            while (i.hasNext())
  +            {
  +               ((SpySession)i.next()).deleteTemporaryDestination(dest);
               }
   
            }
   
            //Ask the broker to delete() this TemporaryDestination
  -         serverIL.deleteTemporaryDestination( connectionToken, dest );
  +         serverIL.deleteTemporaryDestination(connectionToken, dest);
   
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot delete the TemporaryDestination", e );
         }
  -
  -   }
  -
  -   public void asynchClose() {
  -   }
  -
  -   //called by a TemporaryDestination which is going to be deleted()
  -   public void asynchDeleteTemporaryDestination( SpyDestination dest ) {
  -      try {
  -         deleteTemporaryDestination( dest );
  -      } catch ( JMSException e ) {
  -         asynchFailure( e.getMessage(), e.getLinkedException() );
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot delete the TemporaryDestination", e);
         }
   
      }
  -
  -   //Gets the first consumer that is listening to a destination.
  -   public void asynchDeliver( ReceiveRequest requests[] ) {
   
  -      try {
  -         for ( int i = 0; i < requests.length; i++ ) {
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
  +   public void start()
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
   
  -            SpyConsumer consumer = ( SpyConsumer )subscriptions.get( 
requests[i].subscriptionId );
  -            
requests[i].message.createAcknowledgementRequest(requests[i].subscriptionId.intValue());
  +      if (!modeStop)
  +      {
  +         return;
  +      }
  +      modeStop = false;
   
  -            if ( consumer == null ) {
  -               send( requests[i].message.getAcknowledgementRequest( false ) );
  -               cat.debug( "WARNING: NACK issued due to non existent subscription" );
  -               continue;
  -            }
  +      cat.debug("Starting connection, ClientID=" + connectionToken.getClientID());
   
  -            consumer.addMessage( requests[i].message );
  -         }
  -      } catch ( JMSException e ) {
  -         asynchFailure( e.getMessage(), e.getLinkedException() );
  +      try
  +      {
  +         serverIL.setEnabled(connectionToken, true);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot enable the connection with the JMS 
server", e);
         }
      }
   
  -   public void asynchFailure( String reason, Exception e ) {
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
  +   public void stop()
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
   
  -      JMSException excep = new SpyJMSException( reason );
  -      excep.setLinkedException( e );
  -      excep.fillInStackTrace();
  +      if (modeStop)
  +      {
  +         return;
  +      }
  +      modeStop = true;
  +
  +      cat.debug("Stoping connection, ClientID=" + connectionToken.getClientID());
   
  -      cat.debug( e );
  -      if ( exceptionListener != null ) {
  -         synchronized ( exceptionListener ) {
  -            exceptionListener.onException( excep );
  -         }
  -      } else {
  -         cat.warn( "JBossMQ Connection failure: ", excep );
  -         excep.printStackTrace();
  +      try
  +      {
  +         serverIL.setEnabled(connectionToken, false);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot disable the connection with the JMS 
server", e);
         }
      }
   
      //ask the JMS server for a new ID
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
      protected void askForAnID()
  -      throws JMSException {
  -      try {
  +          throws JMSException
  +   {
  +      try
  +      {
            clientID = serverIL.getID();
  -      } catch ( Exception e ) {
  -         cat.debug( "Server Exception: ", e );
  -         throw new SpyJMSException( "Cannot get a client ID: " + e.getMessage(), e 
);
         }
  -   }
  -
  -   // Used to commit/rollback a transaction.
  -   protected void send( TransactionRequest transaction )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +      catch (Exception e)
  +      {
  +         cat.debug("Server Exception: ", e);
  +         throw new SpyJMSException("Cannot get a client ID: " + e.getMessage(), e);
         }
  +   }
   
  -      try {
  -         serverIL.transact( connectionToken, transaction );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot process a transaction", e );
  +   //ask the JMS server for a new ID
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param userName          Description of Parameter
  +    * @param password          Description of Parameter
  +    * @exception JMSException  Description of Exception
  +    */
  +   protected void askForAnID(String userName, String password)
  +          throws JMSException
  +   {
  +      try
  +      {
  +         clientID = serverIL.checkUser(userName, password);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot get a client ID", e);
         }
  -
      }
   
      // used to acknowledge a message
  -   protected void send( AcknowledgementRequest item )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  -      }
  -      try {
  -         serverIL.acknowledge( connectionToken, item );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot acknowlege a message", e );
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param item              Description of Parameter
  +    * @exception JMSException  Description of Exception
  +    */
  +   protected void send(AcknowledgementRequest item)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
  +      try
  +      {
  +         serverIL.acknowledge(connectionToken, item);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot acknowlege a message", e);
         }
      }
   
  -   //ask the JMS server for a new ID
  -   protected void askForAnID( String userName, String password )
  -      throws JMSException {
  -      try {
  -         clientID = serverIL.checkUser( userName, password );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot get a client ID", e );
  +   // Used to commit/rollback a transaction.
  +   /**
  +    * #Description of the Method
  +    *
  +    * @param transaction       Description of Parameter
  +    * @exception JMSException  Description of Exception
  +    */
  +   protected void send(TransactionRequest transaction)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
  +
  +      try
  +      {
  +         serverIL.transact(connectionToken, transaction);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot process a transaction", e);
         }
  +
      }
   
      ////////////////////////////////////////////////////////////////////
  @@ -415,18 +688,27 @@
      ////////////////////////////////////////////////////////////////////
   
      //create a new Distributed object which receives the messages for this connection
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
      protected void startILService()
  -      throws JMSException {
  -      try {
  +          throws JMSException
  +   {
  +      try
  +      {
   
  -         clientILService = genericConnectionFactory.createClientILService( this );
  +         clientILService = genericConnectionFactory.createClientILService(this);
            clientILService.start();
  -         connectionToken = new ConnectionToken( clientID, 
clientILService.getClientIL() );
  -         serverIL.setConnectionToken( connectionToken );
  +         connectionToken = new ConnectionToken(clientID, 
clientILService.getClientIL());
  +         serverIL.setConnectionToken(connectionToken);
   
  -      } catch ( Exception e ) {
  -         cat.debug( e );
  -         throw new SpyJMSException( "Cannot start a the client IL service", e );
  +      }
  +      catch (Exception e)
  +      {
  +         cat.debug(e);
  +         throw new SpyJMSException("Cannot start a the client IL service", e);
         }
      }
   
  @@ -435,15 +717,24 @@
      ////////////////////////////////////////////////////////////////////
   
      //create a new Distributed object which receives the messages for this connection
  +   /**
  +    * #Description of the Method
  +    *
  +    * @exception JMSException  Description of Exception
  +    */
      protected void stopILService()
  -      throws JMSException {
  -      try {
  +          throws JMSException
  +   {
  +      try
  +      {
   
            clientILService.stop();
   
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot stop a the client IL service", e );
         }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot stop a the client IL service", e);
  +      }
      }
   
      //all longs are less than 22 digits long
  @@ -451,190 +742,305 @@
      //Note that in this routine we assume that System.currentTimeMillis() is 
non-negative
      //always be non-negative (so don't set lastMessageID to a positive for a start).
      String getNewMessageID()
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      synchronized ( sb ) {
  -         sb.setLength( 0 );
  -         sb.append( clientID );
  -         sb.append( '-' );
  +      synchronized (sb)
  +      {
  +         sb.setLength(0);
  +         sb.append(clientID);
  +         sb.append('-');
            long time = System.currentTimeMillis();
            int count = 0;
  -         do {
  -            charStack[count] = ( char )( '0' + ( time % 10 ) );
  +         do
  +         {
  +            charStack[count] = (char)('0' + (time % 10));
               time = time / 10;
               ++count;
  -         } while ( time != 0 );
  +         } while (time != 0);
            --count;
  -         for ( ; count >= 0; --count ) {
  -            sb.append( charStack[count] );
  +         for (; count >= 0; --count)
  +         {
  +            sb.append(charStack[count]);
            }
            ++lastMessageID;
            //avoid having to deal with negative numbers.
  -         if ( lastMessageID < 0 ) {
  +         if (lastMessageID < 0)
  +         {
               lastMessageID = 0;
            }
            int id = lastMessageID;
            count = 0;
  -         do {
  -            charStack[count] = ( char )( '0' + ( id % 10 ) );
  +         do
  +         {
  +            charStack[count] = (char)('0' + (id % 10));
               id = id / 10;
               ++count;
  -         } while ( id != 0 );
  +         } while (id != 0);
            --count;
  -         for ( ; count >= 0; --count ) {
  -            sb.append( charStack[count] );
  +         for (; count >= 0; --count)
  +         {
  +            sb.append(charStack[count]);
            }
            return sb.toString();
         }
      }
   
  -   //Called by a session when it is closing
  -   void sessionClosing( SpySession who ) {
  -      synchronized ( createdSessions ) {
  -         createdSessions.remove( who );
  -      }
  -
  -      //This session should not be in the "destinations" object anymore.
  -      //We could check this, though
  -   }
  -
  -   //Send a message to the serverIL
  -   void sendToServer( SpyMessage mes )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  -      }
  -
  -      try {
  -
  -         serverIL.addMessage( connectionToken, mes );
  -
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot send a message to the JMS server", e );
  -      }
  -   }
  -
      //A new Consumer has been created for the Destination dest
  -   void addConsumer( SpyConsumer consumer )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   void addConsumer(SpyConsumer consumer)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
         Subscription req = consumer.getSubscription();
         req.subscriptionId = subscriptionCounter++;
         req.dc = connectionToken;
   
  -      cat.debug( "Connection: addConsumer(dest=" + req.destination.toString() + ")" 
);
  +      cat.debug("Connection: addConsumer(dest=" + req.destination.toString() + ")");
   
  -      try {
  +      try
  +      {
   
  -         synchronized ( subscriptions ) {
  +         synchronized (subscriptions)
  +         {
   
  -            subscriptions.put( new Integer( req.subscriptionId ), consumer );
  +            subscriptions.put(new Integer(req.subscriptionId), consumer);
   
  -            LinkedList ll = ( LinkedList )destinationSubscriptions.get( 
req.destination );
  -            if ( ll == null ) {
  +            LinkedList ll = 
(LinkedList)destinationSubscriptions.get(req.destination);
  +            if (ll == null)
  +            {
                  ll = new LinkedList();
  -               destinationSubscriptions.put( req.destination, ll );
  +               destinationSubscriptions.put(req.destination, ll);
               }
   
  -            ll.add( consumer );
  +            ll.add(consumer);
            }
   
  -         serverIL.subscribe( connectionToken, req );
  +         serverIL.subscribe(connectionToken, req);
   
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot subscribe to this Destination: " + 
e.getMessage(), e );
         }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot subscribe to this Destination: " + 
e.getMessage(), e);
  +      }
   
      }
   
  -
      /**
  -    * @param  queue             Description of Parameter
  -    * @param  selector          Description of Parameter
  -    * @return                   org.jboss.mq.distributed.interfaces.ServerIL
  -    * @exception  JMSException  Description of Exception
  +    * @param queue             Description of Parameter
  +    * @param selector          Description of Parameter
  +    * @return                  org.jboss.mq.distributed.interfaces.ServerIL
  +    * @exception JMSException  Description of Exception
       */
  -   SpyMessage[] browse( Queue queue, String selector )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   SpyMessage[] browse(Queue queue, String selector)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      try {
  -         return serverIL.browse( connectionToken, queue, selector );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot browse the Queue", e );
  +      try
  +      {
  +         return serverIL.browse(connectionToken, queue, selector);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot browse the Queue", e);
         }
      }
   
  +   //Send a message to the serverIL
  +   void pingServer(long clientTime)
  +          throws JMSException
  +   {
  +
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
  +      }
  +
  +      try
  +      {
  +
  +         cat.debug("PING");
  +         serverIL.ping(connectionToken, clientTime);
  +
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot ping the JMS server", e);
  +      }
  +   }
  +
      /**
  -    * @param  sub               Description of Parameter
  -    * @param  wait              Description of Parameter
  -    * @return                   org.jboss.mq.distributed.interfaces.ServerIL
  -    * @exception  JMSException  Description of Exception
  +    * @param sub               Description of Parameter
  +    * @param wait              Description of Parameter
  +    * @return                  org.jboss.mq.distributed.interfaces.ServerIL
  +    * @exception JMSException  Description of Exception
       */
  -   SpyMessage receive( Subscription sub, long wait )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   SpyMessage receive(Subscription sub, long wait)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  -      try {
  -         SpyMessage message = serverIL.receive( connectionToken, 
sub.subscriptionId, wait );
  -         if ( message != null ) {
  +      try
  +      {
  +         SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, 
wait);
  +         if (message != null)
  +         {
               message.createAcknowledgementRequest(sub.subscriptionId);
            }
            return message;
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot create a ConnectionReceiver", e );
         }
  -   }
  -
  -   void unsubscribe( DurableSubcriptionID id )
  -      throws JMSException {
  -      try {
  -         serverIL.destroySubscription( id );
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot destroy durable subscription " + id, e 
);
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot create a ConnectionReceiver", e);
         }
      }
   
      //A consumer does not need to recieve the messages from a Destination
  -   void removeConsumer( SpyConsumer consumer )
  -      throws JMSException {
  -      if ( closed ) {
  -         throw new IllegalStateException( "The connection is closed" );
  +   void removeConsumer(SpyConsumer consumer)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
         Subscription req = consumer.getSubscription();
  -      cat.debug( "Connection: removeSession(dest=" + req.destination + ")" );
  +      cat.debug("Connection: removeSession(dest=" + req.destination + ")");
   
  -      try {
  +      try
  +      {
   
  -         serverIL.unsubscribe( connectionToken, req.subscriptionId );
  +         serverIL.unsubscribe(connectionToken, req.subscriptionId);
   
  -         synchronized ( subscriptions ) {
  +         synchronized (subscriptions)
  +         {
   
  -            subscriptions.remove( new Integer( req.subscriptionId ) );
  +            subscriptions.remove(new Integer(req.subscriptionId));
   
  -            LinkedList ll = ( LinkedList )destinationSubscriptions.get( 
req.destination );
  -            if ( ll != null ) {
  -               ll.remove( req );
  -               if ( ll.size() == 0 ) {
  -                  destinationSubscriptions.remove( req.destination );
  +            LinkedList ll = 
(LinkedList)destinationSubscriptions.get(req.destination);
  +            if (ll != null)
  +            {
  +               ll.remove(req);
  +               if (ll.size() == 0)
  +               {
  +                  destinationSubscriptions.remove(req.destination);
                  }
               }
            }
   
  -      } catch ( Exception e ) {
  -         throw new SpyJMSException( "Cannot unsubscribe to this destination", e );
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot unsubscribe to this destination", e);
  +      }
  +
  +   }
  +
  +   //Send a message to the serverIL
  +   void sendToServer(SpyMessage mes)
  +          throws JMSException
  +   {
  +      if (closed)
  +      {
  +         throw new IllegalStateException("The connection is closed");
         }
   
  +      try
  +      {
  +
  +         serverIL.addMessage(connectionToken, mes);
  +
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot send a message to the JMS server", e);
  +      }
  +   }
  +
  +   //Called by a session when it is closing
  +   void sessionClosing(SpySession who)
  +   {
  +      synchronized (createdSessions)
  +      {
  +         createdSessions.remove(who);
  +      }
  +
  +      //This session should not be in the "destinations" object anymore.
  +      //We could check this, though
  +   }
  +
  +   void unsubscribe(DurableSubcriptionID id)
  +          throws JMSException
  +   {
  +      try
  +      {
  +         serverIL.destroySubscription(id);
  +      }
  +      catch (Exception e)
  +      {
  +         throw new SpyJMSException("Cannot destroy durable subscription " + id, e);
  +      }
  +   }
  +
  +   /**
  +    * #Description of the Class
  +    */
  +   class PingTask implements Runnable
  +   {
  +      /**
  +       * Main processing method for the PingTask object
  +       */
  +      public void run()
  +      {
  +
  +         // Ping until we are closed or interrupted.
  +         try
  +         {
  +            pingTaskSemaphore.release();
  +            while (!closed)
  +            {
  +
  +               ponged = false;
  +               pingServer(System.currentTimeMillis());
  +               Thread.currentThread().sleep(PING_INTERVAL);
  +               // Wait for a ping time out..
  +
  +               if (ponged == false)
  +               {
  +                  // Server did not pong use with in the timeout
  +                  // period..  Assuming the connection is dead.
  +                  throw new SpyJMSException("", new IOException("ping timeout."));
  +               }
  +
  +            }
  +         }
  +         catch (JMSException e)
  +         {
  +            asynchFailure("Connection Failed", e.getLinkedException());
  +         }
  +         catch (InterruptedException e)
  +         {
  +         }
  +         finally
  +         {
  +            // Used to signal that we are done..
  +            cat.debug("PingTask closing.");
  +            pingTaskSemaphore.release();
  +         }
  +      }
      }
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to