User: pra
Date: 01/08/30 13:40:04
Modified: src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
Added: src/main/org/jboss/ejb/plugins/jms DLQHandler.java
Log:
Now handles tight loop with resent messages by sending offending messages to a dead
letter queue (queue/DLQ)
Revision Changes Path
1.28 +23 -1
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -r1.27 -r1.28
--- JMSContainerInvoker.java 2001/08/23 18:17:18 1.27
+++ JMSContainerInvoker.java 2001/08/30 20:40:04 1.28
@@ -60,7 +60,7 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.27 $
+ * @version $Revision: 1.28 $
*/
public class JMSContainerInvoker
implements ContainerInvoker, XmlLoadable
@@ -110,6 +110,8 @@
protected ServerSessionPool pool;
protected ExceptionListenerImpl exListener;
protected String beanName;
+ protected DLQHandler dlqHandler;
+ protected Element mdbConfig;
// Static --------------------------------------------------------
@@ -375,6 +377,11 @@
{
log.debug("initializing");
+ // Set up Dead Letter Queue handler
+ dlqHandler = new DLQHandler();
+ dlqHandler.importXml(mdbConfig);
+ dlqHandler.init();
+
// Store TM reference locally - should we test for CMT Required
tm = container.getTransactionManager();
@@ -572,6 +579,9 @@
{
log.debug("Destroying JMSContainerInvoker for bean " + beanName);
+ // Take down DLQ
+ dlqHandler.destroy();
+
// close the connection consumer
try {
if (connectionConsumer != null) {
@@ -633,6 +643,9 @@
if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
+
+ // Get MDBConfig
+ mdbConfig = (Element)MetaData.getUniqueChild(element,
"MDBConfig").cloneNode(true);
}
// Package protected ---------------------------------------------
@@ -685,8 +698,17 @@
id = "JMSContainerInvoker";
}
+
+
// Invoke, shuld we catch any Exceptions??
try {
+ // DLQHandling
+ if (message.getJMSRedelivered() &&
dlqHandler.handleRedeliveredMessage(message)) {
+ // Message will be placed on Dead Letter Queue,
+ // if redelivered to many times
+ return;
+ }
+
invoker.invoke(id, // Object id - where used?
ON_MESSAGE, // Method to invoke
new Object[] {message}, // argument
1.1 jboss/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java
Index: DLQHandler.java
===================================================================
/*
* JBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins.jms;
import java.util.Hashtable;
import java.util.Enumeration;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.jms.Session;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.JMSException;
import org.w3c.dom.Element;
import org.apache.log4j.Category;
import org.jboss.ejb.DeploymentException;
import org.jboss.metadata.MetaData;
// Why can't I implement Service: org.jboss.ejb.plugins.jms.DLQHandler cannot
implement destroy() in org.jboss.util.Service; attempting to assign weaker access
privileges; was public
//import org.jboss.util.Service;
/**
* Places redeliveded messages on a Dead Letter Queue.
*
*<p>The Dead Letter Queue handler is used to not set JBoss in an endles loop
* when a message is resent on and on due to transaction rollback for
* message receipt.
*
*<p>It sends message to a dead letter queue (configurable, defaults to
* queue/DLQ) when the message has been resent a configurable amount of times,
* defaults to 10.
*
* <p>The handler is configured through the element MDBConfig in
* container-invoker-conf.
*
* <p>The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
* to the name of the original destination (Destionation.toString()).
*
* <p>The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
* to the id of the original message.
*
*
* Created: Thu Aug 23 21:17:26 2001
*
* @author
* @version $Revision: 1.1 $ $Date: 2001/08/30 20:40:04 $
*/
public class DLQHandler {
/** Class logger. */
private static Category log =
Category.getInstance(DLQHandler.class);
/** JMS property name holding original destination. */
public static final String JBOSS_ORIG_DESTINATION ="JBOSS_ORIG_DESTINATION";
/** JMS property name holding original JMS message id. */
public static final String JBOSS_ORIG_MESSAGEID="JBOSS_ORIG_MESSAGEID";
/** Connection factory JNDI, java:/ConnectionFactory, should we make it
configurable? */
private static final String FACTORY_JNDI="java:/ConnectionFactory";
// Configuratable stuff
/**
* Destination to send dead letters to.
*<p>Defaults to queue/DLQ,
* configurable through DestinationQueue element.
*/
private String destinationJNDI = "queue/DLQ";
/**
* Maximum times a message is alowed to be resent.
*
* <p>Defaults to 10, configurable through MaxTimesRedelivered element.
*/
private int maxResent = 10;
/**
* Time to live for the message.
*
*<p>Defaults to Message.DEFAULT_TIME_TO_LIVE, configurable through
* the TimeToLive element.
*/
private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
// May become configurable
/** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
/** Priority for the message, Message.DEFAULT_PRIORITY */
private int priority = Message.DEFAULT_PRIORITY;
// Private stuff
private QueueConnection connection;
private Queue dlq;
private Hashtable resentBuffer = new Hashtable();
public DLQHandler() {
}
//--- Service
/**
* Initalize the service.
*
* @throws Exception Service failed to initalize.
*/
void init() throws Exception {
Context ctx = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory)
ctx.lookup(FACTORY_JNDI);
connection = factory.createQueueConnection();
dlq = (Queue)ctx.lookup(destinationJNDI);
log.debug("Created Dead Letter Queue connection " + dlq);
}
/**
* Start the service.
*
* @throws Exception Service failed to start.
*/
void start() throws Exception {
}
/**
* Stop the service.
*/
void stop() {
}
/**
* Destroy the service.
*/
void destroy(){
try {
connection.stop();
}catch(Exception ex) {}
}
//--- Logic
/**
* Check if a message has been redelivered to many times.
*
* If message has been redelivered to many times, send it to the
* dead letter queue (default to queue/DLQ).
*
* @return true if message is handled, i.e resent, false if not.
*/
public boolean handleRedeliveredMessage(Message msg) {
try {
String id = msg.getJMSMessageID();
if (id == null) {
// Warning function
log.debug("Message id is null, can't handle message");
}
// if we can't get the id we are basically fucked
if(id != null && incrementResentCount(id) > maxResent) {
log.info("Message resent to many time, sending it to DLQ. Id: "
+ id);
sendMessage(msg);
deleteFromBuffer(id);
return true;
}
}catch(JMSException ex) {
// If we can't send it ahead, we do not dare to just drop it...or?
log.error("Could not send message to Dead Letter Queue " + ex,ex);
return false;
}
return false;
}
//--- Private helper stuff
/**
* Increment the counter for the specific JMS message id.
*
* @return the new counter value.
*/
protected int incrementResentCount(String id) {
BufferEntry entry = null;
if(!resentBuffer.containsKey(id)) {
log.debug("Making new entry for id " + id);
entry = new BufferEntry();
entry.id = id;
entry.count = 1;
resentBuffer.put(id,entry);
} else {
entry = (BufferEntry)resentBuffer.get(id);
entry.count++;
log.debug("Incremented old entry for id " + id + " count " + entry.count);
}
return entry.count;
}
/**
* Delete the entry in the message counter buffer for specifyed JMS id.
*/
protected void deleteFromBuffer(String id) {
resentBuffer.remove(id);
}
/**
* Send message to the configured dead letter queue, defaults to queue/DLQ.
*/
protected void sendMessage(Message msg) throws JMSException {
// Set the properties
QueueSession ses = null;
QueueSender sender = null;
try {
msg = makeWritable(msg);//Don't know yet if we are gona clone or not
msg.setStringProperty(JBOSS_ORIG_MESSAGEID,
msg.getJMSMessageID());
msg.setStringProperty(JBOSS_ORIG_DESTINATION,
msg.getJMSDestination().toString());
ses = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
sender = ses.createSender(dlq);
log.debug("Resending DLQ message to destination" + dlq);
sender.send(msg,deliveryMode,priority,timeToLive);
}finally {
try {
sender.close();
ses.close();
}catch(Exception ex) {}
}
}
/**
* Make the Message properties writable.
*
* @return the writable message.
*/
protected Message makeWritable(Message msg) throws JMSException {
Hashtable tmp = new Hashtable();
// Save properties
for(Enumeration en = msg.getPropertyNames();en.hasMoreElements();)
{
String key = (String) en.nextElement();
tmp.put(key,msg.getStringProperty(key));
}
// Make them writable
msg.clearProperties();
Enumeration keys = tmp.keys();
while(keys.hasMoreElements())
{
String key = (String) keys.nextElement();
msg.setStringProperty(key,(String)tmp.get(key));
}
return msg;
}
/**
* Takes an MDBConfig Element
*/
public void importXml(Element element) throws DeploymentException
{
destinationJNDI = MetaData.getElementContent
(MetaData.getUniqueChild(element, "DestinationQueue"));
try {
String mr = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
maxResent = Integer.parseInt(mr);
String ttl = MetaData.getElementContent
(MetaData.getUniqueChild(element, "TimeToLive"));
timeToLive = Long.parseLong(ttl);
} catch (NumberFormatException e) {
//Noop will take default value
} catch (DeploymentException e) {
//Noop will take default value
}
}
private class BufferEntry {
int count;
String id;
}
} // DLQHandler
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development