User: pra     
  Date: 01/08/30 13:40:04

  Modified:    src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
  Added:       src/main/org/jboss/ejb/plugins/jms DLQHandler.java
  Log:
  Now handles tight loop with resent messages by sending offending messages to a dead 
letter queue (queue/DLQ)
  
  Revision  Changes    Path
  1.28      +23 -1     
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
  
  Index: JMSContainerInvoker.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
  retrieving revision 1.27
  retrieving revision 1.28
  diff -u -r1.27 -r1.28
  --- JMSContainerInvoker.java  2001/08/23 18:17:18     1.27
  +++ JMSContainerInvoker.java  2001/08/30 20:40:04     1.28
  @@ -60,7 +60,7 @@
    * @author <a href="mailto:[EMAIL PROTECTED]";>Sebastien Alborini</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Marc Fleury</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.27 $
  + * @version $Revision: 1.28 $
    */
   public class JMSContainerInvoker
         implements ContainerInvoker, XmlLoadable
  @@ -110,6 +110,8 @@
      protected ServerSessionPool pool;
      protected ExceptionListenerImpl exListener;
      protected String beanName;
  +   protected DLQHandler dlqHandler;
  +   protected Element mdbConfig;
   
      // Static --------------------------------------------------------
   
  @@ -375,6 +377,11 @@
      {
         log.debug("initializing");
   
  +      // Set up Dead Letter Queue handler
  +      dlqHandler = new DLQHandler();
  +      dlqHandler.importXml(mdbConfig);
  +      dlqHandler.init();
  +
         // Store TM reference locally - should we test for CMT Required
         tm = container.getTransactionManager();
   
  @@ -572,6 +579,9 @@
      {
         log.debug("Destroying JMSContainerInvoker for bean " + beanName);
   
  +      // Take down DLQ
  +      dlqHandler.destroy();
  +
         // close the connection consumer
         try {
         if (connectionConsumer != null) {
  @@ -633,6 +643,9 @@
   
         if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
         serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
  +
  +      // Get MDBConfig
  +      mdbConfig = (Element)MetaData.getUniqueChild(element, 
"MDBConfig").cloneNode(true);
      }
   
      // Package protected ---------------------------------------------
  @@ -685,8 +698,17 @@
            id = "JMSContainerInvoker";
         }
   
  +
  +
         // Invoke, shuld we catch any Exceptions??
         try {
  +         // DLQHandling
  +         if (message.getJMSRedelivered() && 
dlqHandler.handleRedeliveredMessage(message)) {
  +            // Message will be placed on Dead Letter Queue, 
  +            // if redelivered to many times
  +            return;
  +         }
  +
            invoker.invoke(id,                     // Object id - where used?
                           ON_MESSAGE,             // Method to invoke
                           new Object[] {message}, // argument
  
  
  
  1.1                  jboss/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java
  
  Index: DLQHandler.java
  ===================================================================
  /*
  * JBoss, the OpenSource EJB server
  *
  * Distributable under LGPL license.
  * See terms of license at gnu.org.
  */
  package org.jboss.ejb.plugins.jms;
  
  import java.util.Hashtable;
  import java.util.Enumeration;
  
  import javax.naming.InitialContext;
  import javax.naming.Context;
  
  import javax.jms.Session;
  import javax.jms.QueueConnection;
  import javax.jms.QueueConnectionFactory;
  import javax.jms.QueueSession;
  import javax.jms.QueueSender;
  import javax.jms.Queue;
  import javax.jms.Message;
  import javax.jms.JMSException;
  
  import org.w3c.dom.Element;
  
  import org.apache.log4j.Category;
  
  import org.jboss.ejb.DeploymentException;
  import org.jboss.metadata.MetaData;
  
  
  // Why can't I implement Service: org.jboss.ejb.plugins.jms.DLQHandler cannot 
implement destroy() in org.jboss.util.Service; attempting to assign weaker access 
privileges; was public
  //import org.jboss.util.Service;
  
  /**
   * Places redeliveded messages on a Dead Letter Queue.
   *
   *<p>The Dead Letter Queue handler is used to not set JBoss in an endles loop
   * when a message is resent on and on due to transaction rollback for
   * message receipt.
   *
   *<p>It sends message to a dead letter queue (configurable, defaults to 
   * queue/DLQ) when the message has been resent a configurable amount of times,
   * defaults to 10.
   *
   * <p>The handler is configured through the element MDBConfig in 
   * container-invoker-conf.
   *
   * <p>The JMS property JBOSS_ORIG_DESTINATION in the resent message is set 
   * to the name of the original destination (Destionation.toString()).
   *
   * <p>The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set 
   * to the id of the original message.
   *
   *
   * Created: Thu Aug 23 21:17:26 2001
   *
   * @author 
   * @version $Revision: 1.1 $ $Date: 2001/08/30 20:40:04 $
   */
  
  public class DLQHandler {
     /** Class logger. */
     private static Category log =
        Category.getInstance(DLQHandler.class);
  
     /** JMS property name holding original destination. */
     public static final String JBOSS_ORIG_DESTINATION ="JBOSS_ORIG_DESTINATION";
  
     /** JMS property name holding original JMS message id. */
     public static final String JBOSS_ORIG_MESSAGEID="JBOSS_ORIG_MESSAGEID";
     
     /** Connection factory JNDI, java:/ConnectionFactory, should we make it 
configurable? */
     private static final String FACTORY_JNDI="java:/ConnectionFactory";
        
     // Configuratable stuff
     /** 
      *  Destination to send dead letters to.
      *<p>Defaults to queue/DLQ, 
      * configurable through DestinationQueue element.
      */
     private String destinationJNDI = "queue/DLQ";
  
     /**
      * Maximum times a message is alowed to be resent. 
      *
      * <p>Defaults to 10, configurable through MaxTimesRedelivered element.
      */
     private int maxResent = 10;
  
     /**
      * Time to live for the message.
      *
      *<p>Defaults to Message.DEFAULT_TIME_TO_LIVE, configurable through 
      * the TimeToLive element.
      */
     private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
  
     // May become configurable
     /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
     /** Priority for the message, Message.DEFAULT_PRIORITY */
     private int priority = Message.DEFAULT_PRIORITY;
  
     // Private stuff
     private QueueConnection connection;
     private Queue dlq;
     private Hashtable resentBuffer = new Hashtable();
     
     public DLQHandler() {
        
     }
  
     
     //--- Service
     /**
      * Initalize the service.
      *
      * @throws Exception    Service failed to initalize.
      */
     void init() throws Exception {
        Context ctx = new InitialContext();
        QueueConnectionFactory factory = (QueueConnectionFactory)
         ctx.lookup(FACTORY_JNDI);
        
        connection = factory.createQueueConnection();
        dlq = (Queue)ctx.lookup(destinationJNDI);
        log.debug("Created Dead Letter Queue connection " + dlq);
     }
  
     /**
      * Start the service.
      *
      * @throws Exception    Service failed to start.
      */
     void start() throws Exception {
  
     }
  
     /**
      * Stop the service.
      */
     void stop() {
  
     }
     
     /**
      * Destroy the service.
      */
     void destroy(){
        try {
         connection.stop();
        }catch(Exception ex) {}
     }
  
     //--- Logic
     /**
      * Check if a message has been redelivered to many times.
      *
      * If message has been redelivered to many times, send it to the 
      * dead letter queue (default to queue/DLQ).
      *
      * @return true if message is handled, i.e resent, false if not.
      */
     public boolean handleRedeliveredMessage(Message msg) {
        try {
         String id = msg.getJMSMessageID();
         if (id == null) {
            // Warning function
            log.debug("Message id is null, can't handle message");
         }
         // if we can't get the id we are basically fucked
         if(id != null && incrementResentCount(id) > maxResent) {
            log.info("Message resent to many time, sending it to DLQ. Id: " 
                     + id);
            sendMessage(msg);
            deleteFromBuffer(id);
            return true;
         } 
        }catch(JMSException ex) {
         // If we can't send it ahead, we do not dare to just drop it...or?
         log.error("Could not send message to Dead Letter Queue " + ex,ex);
         return false;
        }
        return false;
        
     }
  
     //--- Private helper stuff
     /**
      * Increment the counter for the specific JMS message id.
      *
      * @return the new counter value.
      */
     protected int incrementResentCount(String id) {
        BufferEntry entry = null;
        if(!resentBuffer.containsKey(id)) {
         log.debug("Making new entry for id " + id);
         entry = new BufferEntry();
         entry.id = id;
         entry.count = 1;
         resentBuffer.put(id,entry);
        } else {
         entry = (BufferEntry)resentBuffer.get(id);
         entry.count++;
         log.debug("Incremented old entry for id " + id + " count " + entry.count);
        }
        return entry.count;
     }
  
     /**
      * Delete the entry in the message counter buffer for specifyed JMS id.
      */
     protected void deleteFromBuffer(String id) {
        resentBuffer.remove(id);
     }
  
     /**
      * Send message to the configured dead letter queue, defaults to queue/DLQ.
      */
     protected void sendMessage(Message msg) throws JMSException {
        // Set the properties
        QueueSession ses = null;
        QueueSender sender = null;
        try {
         msg = makeWritable(msg);//Don't know yet if we are gona clone or not
         msg.setStringProperty(JBOSS_ORIG_MESSAGEID,
                               msg.getJMSMessageID());
         msg.setStringProperty(JBOSS_ORIG_DESTINATION,
                              msg.getJMSDestination().toString());
         
         ses = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
         sender = ses.createSender(dlq);
         log.debug("Resending DLQ message to destination" + dlq);
         sender.send(msg,deliveryMode,priority,timeToLive);
        }finally {
         try {
            sender.close();
            ses.close();
         }catch(Exception ex) {}
        }
        
     }
  
     /**
      * Make the Message properties writable.
      *
      * @return the writable message.
      */
     protected Message makeWritable(Message msg) throws JMSException {
  
        Hashtable tmp = new Hashtable();
        // Save properties
        for(Enumeration en = msg.getPropertyNames();en.hasMoreElements();)
        {
           String key = (String) en.nextElement();
           tmp.put(key,msg.getStringProperty(key));
        }
        // Make them writable
        msg.clearProperties();
  
        Enumeration keys = tmp.keys();
  
        while(keys.hasMoreElements())
        {
           String key = (String) keys.nextElement();
           msg.setStringProperty(key,(String)tmp.get(key));
        }
        return msg;
     }
     
     /**
      * Takes an MDBConfig Element
      */ 
     public void importXml(Element element) throws DeploymentException
     {
        destinationJNDI  = MetaData.getElementContent
         (MetaData.getUniqueChild(element, "DestinationQueue"));
        
        try {
         String mr = MetaData.getElementContent
            (MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
         maxResent = Integer.parseInt(mr);
         
         String ttl = MetaData.getElementContent
            (MetaData.getUniqueChild(element, "TimeToLive"));
         timeToLive = Long.parseLong(ttl);
        } catch (NumberFormatException e) {
         //Noop will take default value
        } catch (DeploymentException e) {
         //Noop will take default value
        }
  
     }
  
     private class BufferEntry {
        int count;
        String id;
     }
  } // DLQHandler
  
  
  
  
  
  
  
  
  
  
  
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to