jstrachan 2002/10/21 13:31:28
Modified: messenger/src/java/org/apache/commons/messagelet Main.java
ManagerServlet.java
messenger/src/java/org/apache/commons/messagelet/model
SubscriptionDigester.java Subscription.java
messenger maven.xml
messenger/src/conf subscribe.xml
Added: messenger/src/java/org/apache/commons/messagelet
SubscriptionManager.java ConsumerThread.java
Log:
Refactored the subscription mechanism into a reusable SubscriptionManager. This
allows code to be shared between the stand alone messagelet engine and the
ManagerServlet.
Also added an option for a <consumerThread> which allows for a plugin point for
performing transactional message consumption
Revision Changes Path
1.6 +5 -73
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java
Index: Main.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- Main.java 7 Oct 2002 10:38:23 -0000 1.5
+++ Main.java 21 Oct 2002 20:31:27 -0000 1.6
@@ -92,11 +92,12 @@
public void run() throws Exception {
// force lazy construction
- getMessengerManager();
+ SubscriptionManager subscriber = new SubscriptionManager();
+ subscriber.setMessengerManager( getMessengerManager() );
+ subscriber.setSubscriptionList( createSubscriptionList() );
+ subscriber.setServletContext( getServletContext() );
- // load the subscriptions....
- subscriptionList = createSubscriptionList();
- subscribe( subscriptionList );
+ subscriber.subscribe();
// now lets start all the connections...
for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) {
@@ -152,65 +153,6 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected void subscribe( SubscriptionList list ) throws JMSException,
ServletException {
- for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
- Subscription subscription = (Subscription) iter.next();
- subscribe( subscription );
- }
- }
-
- protected void subscribe( Subscription subscription ) throws JMSException,
ServletException {
- String name = subscription.getConnection();
- Messenger messenger = getMessenger( name );
- if ( messenger == null ) {
- throw new JMSException( "No such Messenger called: " + name + " for
subscription: " + subscription );
- }
- MessageListener listener = subscription.getMessageListener();
- if ( listener == null ) {
- throw new JMSException( "No MessageListener is defined for
subscription: " + subscription );
- }
-
- // if its an MDO the initialise it!
- if ( listener instanceof MessageDrivenObject ) {
- MessageDrivenObject mdo = (MessageDrivenObject) listener;
- if ( mdo instanceof MessengerMDO ) {
- MessengerMDO messengerMDO = (MessengerMDO) mdo;
- messengerMDO.setMessenger( messenger );
- messengerMDO.setMessengerManager( getMessengerManager() );
- }
- mdo.init( getServletContext() );
- }
-
- listener = wrapInStopWatch( listener );
-
- String subject = subscription.getSubject();
- if ( subject == null || subject.length() == 0 ) {
- throw new JMSException( "No destination defined for subscription: " +
subscription );
- }
-
- Destination destination = messenger.getDestination( subject );
- if ( destination == null ) {
- throw new JMSException( "No destination could be found for name: " +
subject + " for subscription: " + subscription );
- }
-
- // #### at this point we may wish to create a thread pool of multiple
threads
- // #### each consuming from the same Destination in parallel
-
- String selector = subscription.getSelector();
- if ( selector != null && selector.length() > 0 ) {
- log.info( "Subscribing to messenger: " + name + " destination: " +
subject + " selector: " + selector );
-
- messenger.addListener( destination, selector, listener );
- }
- else {
- log.info( "Subscribing to messenger: " + name + " destination: " +
subject );
-
- messenger.addListener( destination, listener );
- }
-
- log.info( "Subscribed with listener: " + listener );
- }
-
protected MessengerManager createMessengerManager() throws JMSException {
String config = connectionsConfig;
@@ -252,16 +194,6 @@
return null;
}
- /**
- * Allows the MessageListener to be wrapped inside a stop watch message
listener if required
- */
- protected MessageListener wrapInStopWatch( MessageListener listener ) {
- if ( useStopWatch ) {
- return new StopWatchMessageListener( listener );
- }
- return listener;
- }
-
/**
* This method blocks the current thread indefinitely until the JVM is
terminated.
*/
1.14 +51 -200
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java
Index: ManagerServlet.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- ManagerServlet.java 13 Aug 2002 07:19:10 -0000 1.13
+++ ManagerServlet.java 21 Oct 2002 20:31:27 -0000 1.14
@@ -40,14 +40,12 @@
*/
public class ManagerServlet extends GenericServlet {
- private static final String KEY_MESSENGER_MANAGER =
MessengerManager.class.getName();
- private static final String KEY_SUBSCRIPTIONLIST =
SubscriptionList.class.getName();
- private static final String KEY_CONNECTIONS = "connections";
- private static final String KEY_SUBSCRIPTIONS = "subscriptions";
-
/** Should HTTP servlets be used or generic servlets. If true then JSP can be
dispatched to easily */
private static final boolean USE_HTTP_SERVLETS = true;
+ private static final String KEY_CONNECTIONS = "connections";
+ private static final String KEY_SUBSCRIPTIONS = "subscriptions";
+
/**
* Whether exceptions occurring during subscriptions on startup should
* terminate the initialization
@@ -57,40 +55,15 @@
public ManagerServlet() {
}
- public MessengerManager getMessengerManager() {
- return (MessengerManager) getServletContext().getAttribute(
KEY_MESSENGER_MANAGER );
- }
-
- public void setMessengerManager(MessengerManager messengerManager) {
- if ( messengerManager == null ) {
- getServletContext().removeAttribute( KEY_MESSENGER_MANAGER );
- }
- else {
- getServletContext().setAttribute( KEY_MESSENGER_MANAGER,
messengerManager );
+ public SubscriptionManager getSubscriptionManager() {
+ SubscriptionManager answer = (SubscriptionManager)
getServletContext().getAttribute( "subscriptionManager" );
+ if (answer == null) {
+ answer = new SubscriptionManager();
+ getServletContext().setAttribute( "subscriptionManager", answer );
}
+ return answer;
}
-
- public SubscriptionList getSubscriptionList() {
- return (SubscriptionList) getServletContext().getAttribute(
KEY_SUBSCRIPTIONLIST );
- }
-
- public void setSubscriptionList(SubscriptionList subscriptionList) {
- if ( subscriptionList == null ) {
- getServletContext().removeAttribute( KEY_SUBSCRIPTIONLIST );
- }
- else {
- getServletContext().setAttribute( KEY_SUBSCRIPTIONLIST,
subscriptionList );
- }
- }
-
- public Messenger getMessenger(String name) throws ServletException {
- MessengerManager messengerManager = getMessengerManager();
- if ( messengerManager == null ) {
- throw new ServletException( "No MessengerManager has been initialized
yet" );
- }
- return messengerManager.getMessenger( name );
- }
-
+
// Servlet methods
//-------------------------------------------------------------------------
@@ -101,44 +74,56 @@
}
// ensure Messenger is initialised
- MessengerManager manager = getMessengerManager();
- if ( manager == null ) {
- manager = createMessengerManager();
- setMessengerManager( manager );
-
- // load the subscriptions....
- SubscriptionList list = createSubscriptionList();
- subscribe( list );
- setSubscriptionList( list );
-
- // now lets start all the connections...
- for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) {
- String name = (String) iter.next();
- Messenger messenger = manager.getMessenger( name );
- try {
- messenger.getConnection().start();
- }
- catch (JMSException e) {
- log( "Caught exception trying to start messenger: " + name + ".
Exception: " + e, e );
+ try {
+ SubscriptionManager subscriber = getSubscriptionManager();
+ MessengerManager manager = subscriber.getMessengerManager();
+ if ( manager == null ) {
+ manager = createMessengerManager();
+
+ subscriber.setMessengerManager( manager );
+ subscriber.setSubscriptionList( createSubscriptionList() );
+ subscriber.setServletContext( getServletContext() );
+
+ // load the subscriptions....
+ subscriber.subscribe();
+
+ // now lets start all the connections...
+ for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); )
{
+ String name = (String) iter.next();
+ Messenger messenger = manager.getMessenger( name );
+ try {
+ messenger.getConnection().start();
+ }
+ catch (JMSException e) {
+ log( "Caught exception trying to start messenger: " + name
+ ". Exception: " + e, e );
+ }
}
}
}
+ catch (JMSException e) {
+ throw new ServletException("Failed to initialize: " + e, e );
+ }
}
public void destroy() {
try {
- destroyMBOs();
+ getSubscriptionManager().unsubscribe();
}
- catch (ServletException e) {
+ catch (Exception e) {
log( "Failed to destrory the MBOs: " + e, e );
}
-
- MessengerManager manager = getMessengerManager();
- if ( manager != null ) {
- log( "Closing the Messenger connections" );
- manager.close();
+
+ try {
+ MessengerManager manager =
getSubscriptionManager().getMessengerManager();
+ if ( manager != null ) {
+ log( "Closing the Messenger connections" );
+ manager.close();
+ }
}
- setMessengerManager( null );
+ catch (Exception e) {
+ log( "Failed to close the Messenger Manager: " + e, e );
+ }
+ getSubscriptionManager().setMessengerManager( null );
}
public void service(ServletRequest request, ServletResponse response) throws
ServletException {
@@ -162,140 +147,6 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected void subscribe( SubscriptionList list ) throws ServletException {
- for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
- Subscription subscription = (Subscription) iter.next();
- subscribe( subscription );
- }
- }
-
- protected void subscribe( Subscription subscription ) throws ServletException {
- String name = subscription.getConnection();
- Messenger messenger = getMessenger( name );
- if ( messenger == null ) {
- throw new ServletException( "No such Messenger called: " + name + " for
subscription: " + subscription );
- }
- MessageListener listener = null;
- String servlet = subscription.getServlet();
- if ( servlet != null ) {
- if ( USE_HTTP_SERVLETS ) {
- listener = new MessageHttpServletDispatcher( servlet );
- }
- else {
- listener = new MessageServletDispatcher( servlet );
- }
- }
- else {
- listener = subscription.getMessageListener();
- if ( listener == null ) {
- throw new ServletException( "No MessageListener is defined for
subscription: " + subscription );
- }
- }
-
- // if its an MDO the initialise it!
- if ( listener instanceof MessageDrivenObject ) {
- MessageDrivenObject mdo = (MessageDrivenObject) listener;
- if ( mdo instanceof MessengerMDO ) {
- MessengerMDO messengerMDO = (MessengerMDO) mdo;
- messengerMDO.setMessenger( messenger );
- messengerMDO.setMessengerManager( getMessengerManager() );
- }
- mdo.init( getServletContext() );
- }
-
- String subject = subscription.getSubject();
- if ( subject == null || subject.length() == 0 ) {
- throw new ServletException( "No destination defined for subscription: "
+ subscription );
- }
-
- Destination destination = null;
- try {
- destination = messenger.getDestination( subject );
- }
- catch (JMSException e) {
- handleJMSException( "Could not create destination for name: " + subject
+ " for subscription: " + subscription, e );
- }
- if ( destination == null ) {
- throw new ServletException( "No destination could be found for name: "
+ subject + " for subscription: " + subscription );
- }
-
- // #### at this point we may wish to create a thread pool of multiple
threads
- // #### each consuming from the same Destination in parallel
-
- try {
- String selector = subscription.getSelector();
- if ( selector != null && selector.length() > 0 ) {
- log( "Subscribing to messenger: " + name + " destination: " +
subject + " selector: " + selector );
-
- messenger.addListener( destination, selector, listener );
- }
- else {
- log( "Subscribing to messenger: " + name + " destination: " +
subject );
-
- messenger.addListener( destination, listener );
- }
- }
- catch (JMSException e) {
- handleJMSException( "Could not subscribe to destination:" + destination
+ " for subscription: " + subscription, e );
- }
- }
-
- /** Destrorys all current MBOs in this web application */
- protected void destroyMBOs() throws ServletException {
- SubscriptionList list = getSubscriptionList();
- if ( list != null ) {
- for (Iterator iter = list.getSubscriptions().iterator();
iter.hasNext(); ) {
- Subscription subscription = (Subscription) iter.next();
- destroyMBOs( subscription );
- }
- }
- }
-
- protected void destroyMBOs( Subscription subscription ) throws ServletException
{
- // lets unsubscribe first
- String name = subscription.getConnection();
- Messenger messenger = getMessenger( name );
- MessageListener listener = subscription.getMessageListener();
- if ( messenger != null && listener != null ) {
- Destination destination = null;
- String subject = subscription.getSubject();
- if ( subject == null || subject.length() == 0 ) {
- log( "No destination defined for subscription: " + subscription );
- }
- else {
- try {
- destination = messenger.getDestination( subject );
- if ( destination == null ) {
- log( "No destination could be found for name: " + subject +
" for subscription: " + subscription );
- }
- }
- catch (JMSException e) {
- log( "Could not create destination for name: " + subject + "
for subscription: " + subscription, e );
- }
- }
- if ( destination != null ) {
- try {
- String selector = subscription.getSelector();
- if ( selector != null && selector.length() > 0 ) {
- messenger.removeListener( destination, selector, listener );
- }
- else {
- messenger.removeListener( destination, listener );
- }
- }
- catch (JMSException e) {
- log( "Could not unsubscribe to destination:" + destination + "
for subscription: " + subscription, e );
- }
- }
- }
-
- // now lets destrory the MBO
- if ( listener instanceof MessageDrivenObject ) {
- MessageDrivenObject mdo = (MessageDrivenObject) listener;
- mdo.destroy();
- }
- }
-
protected MessengerManager createMessengerManager() throws ServletException {
String config = getURLResource( KEY_CONNECTIONS, "The Messenger connections
XML deployment document" );
1.1
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/SubscriptionManager.java
Index: SubscriptionManager.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
*/
package org.apache.commons.messagelet;
import java.util.Iterator;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.messagelet.model.Subscription;
import org.apache.commons.messagelet.model.SubscriptionList;
import org.apache.commons.messenger.Messenger;
import org.apache.commons.messenger.MessengerManager;
import org.apache.commons.messenger.tool.StopWatchMessageListener;
/**
* <p><code>SubscriptionManager</code> is a simple command line program that will
* create a number of subscriptions and consume messages using just regular
* MDO and MessageListener classes.
*
* @author <a href="mailto:jstrachan@;apache.org">James Strachan</a>
* @version $Revision: 1.12 $
*/
public class SubscriptionManager {
/** Logger */
private static final Log log = LogFactory.getLog(SubscriptionManager.class);
/** The JMS connections */
private MessengerManager manager;
/** The JMS Subscriptions */
private SubscriptionList subscriptionList;
/** The context passed into MDOs */
private ServletContext servletContext;
/** Should we use a stopwatch to output performance metrics */
private boolean useStopWatch = false;
public SubscriptionManager() {
}
protected void subscribe() throws JMSException, ServletException {
for (Iterator iter = getSubscriptionList().getSubscriptions().iterator();
iter.hasNext(); ) {
Subscription subscription = (Subscription) iter.next();
subscribe( subscription );
}
}
public void subscribe( Subscription subscription ) throws JMSException,
ServletException{
String name = subscription.getConnection();
Messenger messenger = getMessenger( name );
if ( messenger == null ) {
throw new JMSException( "No such Messenger called: " + name + " for
subscription: " + subscription );
}
String subject = subscription.getSubject();
if ( subject == null || subject.length() == 0 ) {
throw new JMSException( "No destination defined for subscription: " +
subscription );
}
Destination destination = messenger.getDestination( subject );
if ( destination == null ) {
throw new JMSException( "No destination could be found for name: " +
subject + " for subscription: " + subscription );
}
MessageListener listener = subscription.getMessageListener();
if ( listener == null ) {
throw new JMSException( "No MessageListener is defined for subscription:
" + subscription );
}
// if its an MDO the initialise it!
if ( listener instanceof MessageDrivenObject ) {
MessageDrivenObject mdo = (MessageDrivenObject) listener;
if ( mdo instanceof MessengerMDO ) {
MessengerMDO messengerMDO = (MessengerMDO) mdo;
messengerMDO.setMessenger( messenger );
messengerMDO.setMessengerManager( getMessengerManager() );
}
mdo.init( getServletContext() );
}
listener = wrapInStopWatch( listener );
String selector = subscription.getSelector();
ConsumerThread thread = subscription.getConsumerThread();
if (thread != null) {
log.info( "Subscribing to messenger: " + name + " destination: " +
subject + " selector: " + selector + " with: " + thread );
thread.setMessenger(messenger);
thread.setDestination(destination);
thread.setSelector(selector);
thread.setListener(listener);
thread.start();
}
else {
if ( selector != null && selector.length() > 0 ) {
log.info( "Subscribing to messenger: " + name + " destination: " +
subject + " selector: " + selector );
messenger.addListener( destination, selector, listener );
}
else {
log.info( "Subscribing to messenger: " + name + " destination: " +
subject );
messenger.addListener( destination, listener );
}
log.info( "Subscribed with listener: " + listener );
}
}
public void unsubscribe() throws JMSException, ServletException {
SubscriptionList list = getSubscriptionList();
if ( list != null ) {
for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext();
) {
Subscription subscription = (Subscription) iter.next();
unsubscribe( subscription );
}
}
}
public void unsubscribe( Subscription subscription ) throws JMSException,
ServletException {
// lets unsubscribe first
String name = subscription.getConnection();
Messenger messenger = getMessenger( name );
MessageListener listener = subscription.getMessageListener();
if ( messenger != null && listener != null ) {
Destination destination = null;
String subject = subscription.getSubject();
if ( subject == null || subject.length() == 0 ) {
log.error( "No destination defined for subscription: " +
subscription );
}
else {
try {
destination = messenger.getDestination( subject );
if ( destination == null ) {
log.error( "No destination could be found for name: " +
subject + " for subscription: " + subscription );
}
}
catch (JMSException e) {
log.error( "Could not create destination for name: " + subject +
" for subscription: " + subscription, e );
}
}
if ( destination != null ) {
try {
String selector = subscription.getSelector();
if ( selector != null && selector.length() > 0 ) {
messenger.removeListener( destination, selector, listener );
}
else {
messenger.removeListener( destination, listener );
}
}
catch (JMSException e) {
log.error( "Could not unsubscribe to destination:" + destination
+ " for subscription: " + subscription, e );
}
}
}
// now lets destrory the MBO
if ( listener instanceof MessageDrivenObject ) {
MessageDrivenObject mdo = (MessageDrivenObject) listener;
mdo.destroy();
}
}
// Properties
//-------------------------------------------------------------------------
public MessengerManager getMessengerManager() throws JMSException {
return manager;
}
public void setMessengerManager(MessengerManager manager) {
this.manager = manager;
}
/**
* Returns the subscriptionList.
* @return SubscriptionList
*/
public SubscriptionList getSubscriptionList() {
return subscriptionList;
}
/**
* Sets the subscriptionList.
* @param subscriptionList The subscriptionList to set
*/
public void setSubscriptionList(SubscriptionList subscriptionList) {
this.subscriptionList = subscriptionList;
}
/**
* Returns the servletContext.
* @return ServletContext
*/
public ServletContext getServletContext() {
return servletContext;
}
/**
* Sets the servletContext.
* @param servletContext The servletContext to set
*/
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Allows the MessageListener to be wrapped inside a stop watch message listener
if required
*/
protected MessageListener wrapInStopWatch( MessageListener listener ) {
if ( useStopWatch ) {
return new StopWatchMessageListener( listener );
}
return listener;
}
protected Messenger getMessenger(String name) throws JMSException {
return getMessengerManager().getMessenger( name );
}
}
1.1
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java
Index: ConsumerThread.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
*/
package org.apache.commons.messagelet;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.messenger.Messenger;
/**
* <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS
messages
* using a receive() method on Messenger and then process the message.
* This class is a good base class when implementing some kind of transactional
processing of
* JMS messages
*
* @author <a href="mailto:jstrachan@;apache.org">James Strachan</a>
* @version $Revision: 1.12 $
*/
public class ConsumerThread extends Thread {
/** Logger */
private static final Log log = LogFactory.getLog(ConsumerThread.class);
private MessageConsumer consumer;
private Messenger messenger;
private Destination destination;
private String selector;
private MessageListener listener;
private boolean shouldStop;
public ConsumerThread() {
setName("Consumer" + getName());
}
/**
* Starts all the JMS connections and consumes JMS messages,
* passing them onto the MessageListener and Message Driven Objects
*/
public void run() {
if (log.isDebugEnabled()) {
log.debug( "Starting consumer thread: " + getName());
}
try {
startConsumer();
}
catch (JMSException e) {
log.error("Failed to start consumer thread: " + e, e);
setShouldStop(true);
}
while (! isShouldStop()) {
startTransaction();
try {
Message message = receive();
if (log.isTraceEnabled()) {
log.trace( "Found: " + message );
}
if (message != null) {
processMessage(message);
commitTransaction();
}
else {
cancelTransaction();
}
}
catch (Exception e) {
rollbackTransaction(e);
}
}
try {
stopConsumer();
}
catch (JMSException e) {
log.error("Failed to stop consuming messages: " + e, e);
}
}
// Properties
//-------------------------------------------------------------------------
/**
* Returns the destination.
* @return Destination
*/
public Destination getDestination() {
return destination;
}
/**
* Returns the listener.
* @return MessageListener
*/
public MessageListener getListener() {
return listener;
}
/**
* Returns the messenger.
* @return Messenger
*/
public Messenger getMessenger() {
return messenger;
}
/**
* Returns the selector.
* @return String
*/
public String getSelector() {
return selector;
}
/**
* Returns the shouldStop.
* @return boolean
*/
public boolean isShouldStop() {
return shouldStop;
}
/**
* Sets the destination.
* @param destination The destination to set
*/
public void setDestination(Destination destination) {
this.destination = destination;
}
/**
* Sets the listener.
* @param listener The listener to set
*/
public void setListener(MessageListener listener) {
this.listener = listener;
}
/**
* Sets the messenger.
* @param messenger The messenger to set
*/
public void setMessenger(Messenger messenger) {
this.messenger = messenger;
}
/**
* Sets the selector.
* @param selector The selector to set
*/
public void setSelector(String selector) {
this.selector = selector;
}
/**
* Sets the shouldStop.
* @param shouldStop The shouldStop to set
*/
public void setShouldStop(boolean shouldStop) {
this.shouldStop = shouldStop;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Starts consuming messages
*/
protected void startConsumer() throws JMSException {
consumer = createConsumer();
}
/**
* Stops consuming messages
*/
protected void stopConsumer() throws JMSException {
consumer.close();
}
/**
* Factory method to create a new MessageConsumer
*/
protected MessageConsumer createConsumer() throws JMSException {
String selector = getSelector();
if (selector != null) {
return getMessenger().createConsumer(getDestination(), selector);
}
else {
return getMessenger().createConsumer(getDestination());
}
}
/**
* Strategy method to consume a message using a receive() kind of method.
* @return the message or null if a message could not be found after waiting for
* some period of time.
*/
private Message receive() throws JMSException {
return getConsumer().receive();
}
/**
* Strategy method to process a given message.
* By default this will just invoke the MessageListener
*/
protected void processMessage(Message message) throws JMSException {
MessageListener listener = getListener();
if (listener != null) {
listener.onMessage(message);
}
}
/**
* Strategy method to represent the code required to start
* a transaction.
*/
protected void startTransaction() {
}
/**
* Strategy method to represent the code required to commit
* a transaction.
*/
protected void commitTransaction() throws Exception {
}
/**
* Strategy method to represent the code required to rollback
* a transaction.
*/
protected void rollbackTransaction(Exception e) {
}
/**
* Strategy method to represent the code required to cancel
* a transaction.
* This is called when a message is not received.
*/
protected void cancelTransaction() throws Exception {
}
/**
* @erturn the consumer of messages
*/
protected MessageConsumer getConsumer() {
return consumer;
}
}
1.2 +10 -0
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java
Index: SubscriptionDigester.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SubscriptionDigester.java 14 Aug 2002 14:36:12 -0000 1.1
+++ SubscriptionDigester.java 21 Oct 2002 20:31:28 -0000 1.2
@@ -26,6 +26,7 @@
private String bridgeClass = "org.apache.commons.messagelet.BridgeMDO";
private String distributeBridgeClass =
"org.apache.commons.messagelet.DistributeBridgeMDO";
private String stopWatchClass =
"org.apache.commons.messenger.tool.StopWatchMessageListener";
+ private String consumerThreadClass =
"org.apache.commons.messagelet.ConsumerThread";
public SubscriptionDigester() {
}
@@ -58,12 +59,21 @@
addCallMethod( "subscriptions/subscription/servlet", "setServlet", 0);
+ path = "subscriptions/subscription/consumerThread";
+ addObjectCreate( path, consumerThreadClass, "className" );
+ addSetProperties( path );
+ addSetNext( path, "setConsumerThread",
+ consumerThreadClass
+ );
+
path = "subscriptions/subscription/listener";
addObjectCreate( path, listenerClass, "className" );
addSetProperties( path );
addSetNext( path, "setMessageListener",
"javax.jms.MessageListener"
);
+
+
path = "subscriptions/subscription/stopWatch";
addObjectCreate( path, stopWatchClass, "className" );
1.2 +22 -1
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java
Index: Subscription.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Subscription.java 14 Aug 2002 14:36:12 -0000 1.1
+++ Subscription.java 21 Oct 2002 20:31:28 -0000 1.2
@@ -13,6 +13,8 @@
import javax.jms.JMSException;
import javax.jms.MessageListener;
+import org.apache.commons.messagelet.ConsumerThread;
+
import org.apache.commons.messenger.Messenger;
import org.apache.commons.messenger.MessengerManager;
@@ -38,6 +40,9 @@
/** Holds value of property servlet. */
private String servlet;
+
+ /** should a ConsumerThread be used to consume these messages */
+ private ConsumerThread consumerThread;
public Subscription() {
}
@@ -114,6 +119,22 @@
}
+ /**
+ * Returns the consumerThread.
+ * @return ConsumerThread
+ */
+ public ConsumerThread getConsumerThread() {
+ return consumerThread;
+ }
+
+ /**
+ * Sets the consumerThread.
+ * @param consumerThread The consumerThread to set
+ */
+ public void setConsumerThread(ConsumerThread consumerThread) {
+ this.consumerThread = consumerThread;
+ }
+
/** Outputs a debugging string */
public String toString() {
StringBuffer buffer = new StringBuffer( super.toString() );
@@ -135,5 +156,5 @@
}
return buffer.toString();
}
-
+
}
1.4 +2 -2 jakarta-commons-sandbox/messenger/maven.xml
Index: maven.xml
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/maven.xml,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- maven.xml 7 Oct 2002 10:38:23 -0000 1.3
+++ maven.xml 21 Oct 2002 20:31:28 -0000 1.4
@@ -36,13 +36,13 @@
</java>
</goal>
- <goal name="demo.send" prereqs="set.classpath"
+ <goal name="demo:send" prereqs="set.classpath"
description="Sends a message for processing by an MDO">
<java classname="org.apache.commons.messenger.tool.Producer" fork="yes">
<classpath refid="jms.classpath"/>
<arg value="queue"/>
- <arg value="echo.queue"/>
+ <arg value="echo.queue2"/>
<arg value="src/conf/sampleMessage.txt"/>
<sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>
</java>
1.10 +5 -0 jakarta-commons-sandbox/messenger/src/conf/subscribe.xml
Index: subscribe.xml
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/subscribe.xml,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- subscribe.xml 7 Oct 2002 10:38:23 -0000 1.9
+++ subscribe.xml 21 Oct 2002 20:31:28 -0000 1.10
@@ -6,6 +6,11 @@
<listener className="org.apache.commons.messenger.LoggingMDO"/>
</subscription>
+ <subscription connection="queue" subject="echo.queue2">
+ <consumerThread/>
+ <listener className="org.apache.commons.messenger.LoggingMDO"/>
+ </subscription>
+
<!-- bridge to another JMS provider -->
<subscription connection="queue" subject="my.input2" selector="b='12'">
<bridge outputConnection="queue" outputSubject="my.output"/>
--
To unsubscribe, e-mail: <mailto:commons-dev-unsubscribe@;jakarta.apache.org>
For additional commands, e-mail: <mailto:commons-dev-help@;jakarta.apache.org>