jstrachan 01/08/30 05:18:06
Modified: messenger/src/java/org/apache/commons/messenger
Messenger.java MessengerSupport.java
messenger/src/test/org/apache/commons/messenger
TestMessenger.java
Log:
Modified Messenger API to use Destination rather than a String subject to allow
cleaner JMS integration, particularly when sending a reply to a JMS message
Revision Changes Path
1.4 +27 -23
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java
Index: Messenger.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- Messenger.java 2001/08/28 22:38:28 1.3
+++ Messenger.java 2001/08/30 12:18:06 1.4
@@ -5,13 +5,14 @@
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
- * $Id: Messenger.java,v 1.3 2001/08/28 22:38:28 jstrachan Exp $
+ * $Id: Messenger.java,v 1.4 2001/08/30 12:18:06 jstrachan Exp $
*/
package org.apache.commons.messenger;
import java.io.Serializable;
import javax.jms.BytesMessage;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -28,47 +29,50 @@
* taglib) will use the same JMS Session.</p>
*
* @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a>
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public interface Messenger {
/** Temporary hack - this method has been added so that Messenger works better
with the digester */
public String getName();
+
+ /** Returns the destination for the given subject name */
+ public Destination getDestination(String subject) throws JMSException;
- /** Sends a message on the given subject name */
- public void send(String subject, Message message) throws JMSException;
+ /** Sends a message on the given destination */
+ public void send(Destination destination, Message message) throws JMSException;
- /** Sends a message on the given subject and blocks until a response is
returned */
- public Message call(String subject, Message message) throws JMSException;
+ /** Sends a message on the given destination and blocks until a response is
returned */
+ public Message call(Destination destination, Message message) throws
JMSException;
- /** Receives a message on the given subject name, blocking until one is
returned */
- public Message receive(String subject) throws JMSException;
+ /** Receives a message on the given destination, blocking until one is returned
*/
+ public Message receive(Destination destination) throws JMSException;
- /** Receives a message on the given subject name and message selector, blocking
until one is returned */
- public Message receive(String subject, String selector) throws JMSException;
+ /** Receives a message on the given destination and message selector, blocking
until one is returned */
+ public Message receive(Destination destination, String selector) throws
JMSException;
- /** Receives a message on the given subject name, blocking for the specified
timeout */
- public Message receive(String subject, long timeoutMillis) throws JMSException;
+ /** Receives a message on the given destination, blocking for the specified
timeout */
+ public Message receive(Destination destination, long timeoutMillis) throws
JMSException;
- /** Receives a message on the given subject name and selector, blocking for the
specified timeout */
- public Message receive(String subject, String selector, long timeoutMillis)
throws JMSException;
+ /** Receives a message on the given destination and selector, blocking for the
specified timeout */
+ public Message receive(Destination destination, String selector, long
timeoutMillis) throws JMSException;
- /** Receives a message on the given subject name without blocking or returns
null */
- public Message receiveNoWait(String subject) throws JMSException;
+ /** Receives a message on the given destination without blocking or returns
null */
+ public Message receiveNoWait(Destination destination) throws JMSException;
- /** Receives a message on the given subject name and selector without blocking
or returns null */
- public Message receiveNoWait(String subject, String selector) throws
JMSException;
+ /** Receives a message on the given destination and selector without blocking
or returns null */
+ public Message receiveNoWait(Destination destination, String selector) throws
JMSException;
// Listener API
//-------------------------------------------------------------------------
- /** Adds a message listener on the given subject */
- public void addListener(String subject, MessageListener listener) throws
JMSException;
- public void addListener(String subject, String selector, MessageListener
listener) throws JMSException;
+ /** Adds a message listener on the given destination */
+ public void addListener(Destination destination, MessageListener listener)
throws JMSException;
+ public void addListener(Destination destination, String selector,
MessageListener listener) throws JMSException;
- public void removeListener(String subject, MessageListener listener) throws
JMSException;
- public void removeListener(String subject, String selector, MessageListener
listener) throws JMSException;
+ public void removeListener(Destination destination, MessageListener listener)
throws JMSException;
+ public void removeListener(Destination destination, String selector,
MessageListener listener) throws JMSException;
// Message factory methods
1.4 +61 -53
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
Index: MessengerSupport.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- MessengerSupport.java 2001/08/28 22:38:28 1.3
+++ MessengerSupport.java 2001/08/30 12:18:06 1.4
@@ -5,13 +5,14 @@
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
- * $Id: MessengerSupport.java,v 1.3 2001/08/28 22:38:28 jstrachan Exp $
+ * $Id: MessengerSupport.java,v 1.4 2001/08/30 12:18:06 jstrachan Exp $
*/
package org.apache.commons.messenger;
import java.io.Serializable;
import javax.jms.BytesMessage;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -37,7 +38,7 @@
* connection and session creation and the pooling strategy.</p>
*
* @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a>
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public abstract class MessengerSupport implements Messenger {
@@ -58,10 +59,25 @@
public MessengerSupport() {
}
- public void send( String subject, Message message ) throws JMSException {
+ public Destination getDestination(String subject) throws JMSException {
Session session = borrowSession();
try {
- MessageProducer producer = getMessageProducer( session, subject );
+ if ( session instanceof TopicSession ) {
+ return getTopic( (TopicSession) session, subject );
+ }
+ else {
+ return getQueue( (QueueSession) session, subject );
+ }
+ }
+ finally {
+ returnSession( session );
+ }
+ }
+
+ public void send( Destination destination, Message message ) throws
JMSException {
+ Session session = borrowSession();
+ try {
+ MessageProducer producer = getMessageProducer( session, destination );
if ( producer instanceof TopicPublisher ) {
((TopicPublisher) producer).publish( message );
}
@@ -74,14 +90,14 @@
}
}
- public Message call( String subject, Message message ) throws JMSException {
+ public Message call( Destination destination, Message message ) throws
JMSException {
Session session = borrowSession();
try {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
TopicRequestor requestor = new TopicRequestor(
topicSession,
- getTopic( topicSession, subject )
+ (Topic) destination
);
return requestor.request( message );
}
@@ -89,7 +105,7 @@
QueueSession queueSession = (QueueSession) session;
QueueRequestor requestor = new QueueRequestor(
queueSession,
- getQueue( queueSession, subject )
+ (Queue) destination
);
return requestor.request( message );
}
@@ -99,10 +115,10 @@
}
}
- public Message receive(String subject) throws JMSException {
+ public Message receive(Destination destination) throws JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject );
+ MessageConsumer consumer = getMessageConsumer( session, destination );
return consumer.receive();
}
finally {
@@ -110,10 +126,10 @@
}
}
- public Message receive(String subject, String selector) throws JMSException {
+ public Message receive(Destination destination, String selector) throws
JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject,
selector );
+ MessageConsumer consumer = getMessageConsumer( session, destination,
selector );
return consumer.receive();
}
finally {
@@ -121,10 +137,10 @@
}
}
- public Message receive(String subject, long timeoutMillis) throws JMSException
{
+ public Message receive(Destination destination, long timeoutMillis) throws
JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject );
+ MessageConsumer consumer = getMessageConsumer( session, destination );
return consumer.receive(timeoutMillis);
}
finally {
@@ -132,10 +148,10 @@
}
}
- public Message receive(String subject, String selector, long timeoutMillis)
throws JMSException {
+ public Message receive(Destination destination, String selector, long
timeoutMillis) throws JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject,
selector );
+ MessageConsumer consumer = getMessageConsumer( session, destination,
selector );
return consumer.receive(timeoutMillis);
}
finally {
@@ -143,10 +159,10 @@
}
}
- public Message receiveNoWait(String subject) throws JMSException {
+ public Message receiveNoWait(Destination destination) throws JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject );
+ MessageConsumer consumer = getMessageConsumer( session, destination );
return consumer.receiveNoWait();
}
finally {
@@ -154,10 +170,10 @@
}
}
- public Message receiveNoWait(String subject, String selector) throws
JMSException {
+ public Message receiveNoWait(Destination destination, String selector) throws
JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = getMessageConsumer( session, subject,
selector );
+ MessageConsumer consumer = getMessageConsumer( session, destination,
selector );
return consumer.receiveNoWait();
}
finally {
@@ -169,10 +185,10 @@
// Listener API
//-------------------------------------------------------------------------
- public void addListener(String subject, MessageListener listener) throws
JMSException {
+ public void addListener(Destination destination, MessageListener listener)
throws JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = createMessageConsumer( session, subject );
+ MessageConsumer consumer = createMessageConsumer( session, destination
);
consumer.setMessageListener( listener );
}
finally {
@@ -180,10 +196,10 @@
}
}
- public void addListener(String subject, String selector, MessageListener
listener) throws JMSException {
+ public void addListener(Destination destination, String selector,
MessageListener listener) throws JMSException {
Session session = borrowSession();
try {
- MessageConsumer consumer = createMessageConsumer( session, subject,
selector );
+ MessageConsumer consumer = createMessageConsumer( session, destination,
selector );
consumer.setMessageListener( listener );
}
finally {
@@ -192,12 +208,12 @@
}
- public void removeListener(String subject, MessageListener listener ) throws
JMSException {
+ public void removeListener(Destination destination, MessageListener listener )
throws JMSException {
// we need to iterate through all sessions to find which one has a listener
for
throw new JMSException( "Not implemented yet" );
}
- public void removeListener(String subject, String selector, MessageListener
listener ) throws JMSException {
+ public void removeListener(Destination destination, String selector,
MessageListener listener ) throws JMSException {
// we need to iterate through all sessions to find which one has a listener
for
throw new JMSException( "Not implemented yet" );
}
@@ -373,59 +389,59 @@
/** Deletes a session instance */
protected abstract void deleteSession(Session session) throws JMSException;
- /** Returns a message producer for the given session and subject */
- protected MessageProducer getMessageProducer( Session session, String subject )
throws JMSException {
+ /** Returns a message producer for the given session and destination */
+ protected MessageProducer getMessageProducer( Session session, Destination
destination ) throws JMSException {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
- return topicSession.createPublisher( getTopic(topicSession, subject) );
+ return topicSession.createPublisher( (Topic) destination );
}
else {
QueueSession queueSession = (QueueSession) session;
- return queueSession.createSender( getQueue(queueSession, subject) );
+ return queueSession.createSender( (Queue) destination );
}
}
- /** Returns a MessageConsumer for the given session and subject */
- protected MessageConsumer getMessageConsumer( Session session, String subject )
throws JMSException {
+ /** Returns a MessageConsumer for the given session and destination */
+ protected MessageConsumer getMessageConsumer( Session session, Destination
destination ) throws JMSException {
// could do caching one day
- return createMessageConsumer( session, subject );
+ return createMessageConsumer( session, destination );
}
- /** Returns a MessageConsumer for the given session, subject and selector */
- protected MessageConsumer getMessageConsumer( Session session, String subject,
String selector ) throws JMSException {
+ /** Returns a MessageConsumer for the given session, destination and selector
*/
+ protected MessageConsumer getMessageConsumer( Session session, Destination
destination, String selector ) throws JMSException {
// could do caching one day
- return createMessageConsumer( session, subject, selector );
+ return createMessageConsumer( session, destination, selector );
}
- /** Returns a new MessageConsumer for the given session and subject */
- protected MessageConsumer createMessageConsumer( Session session, String
subject ) throws JMSException {
+ /** Returns a new MessageConsumer for the given session and destination */
+ protected MessageConsumer createMessageConsumer( Session session, Destination
destination ) throws JMSException {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
if ( isDurable() ) {
return topicSession.createDurableSubscriber(
- getTopic(topicSession, subject),
+ (Topic) destination,
getDurableName()
);
}
else {
return topicSession.createSubscriber(
- getTopic(topicSession, subject)
+ (Topic) destination
);
}
}
else {
QueueSession queueSession = (QueueSession) session;
- return queueSession.createReceiver( getQueue(queueSession, subject) );
+ return queueSession.createReceiver( (Queue) destination );
}
}
- /** Returns a new MessageConsumer for the given session, subject and selector
*/
- protected MessageConsumer createMessageConsumer( Session session, String
subject, String selector ) throws JMSException {
+ /** Returns a new MessageConsumer for the given session, destination and
selector */
+ protected MessageConsumer createMessageConsumer( Session session, Destination
destination, String selector ) throws JMSException {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
if ( isDurable() ) {
return topicSession.createDurableSubscriber(
- getTopic(topicSession, subject),
+ (Topic) destination,
getDurableName(),
selector,
isNoLocal()
@@ -433,7 +449,7 @@
}
else {
return topicSession.createSubscriber(
- getTopic(topicSession, subject),
+ (Topic) destination,
selector,
isNoLocal()
);
@@ -442,23 +458,15 @@
else {
QueueSession queueSession = (QueueSession) session;
return queueSession.createReceiver(
- getQueue(queueSession, subject),
+ (Queue) destination,
selector
);
}
}
- protected Queue getQueue(Session session, String subject) throws JMSException {
- return getQueue((QueueSession) session, subject);
- }
-
protected Queue getQueue(QueueSession session, String subject) throws
JMSException {
// XXXX: might want to cache
return session.createQueue( subject );
- }
-
- protected Topic getTopic(Session session, String subject) throws JMSException {
- return getTopic((TopicSession) session, subject);
}
protected Topic getTopic(TopicSession session, String subject) throws
JMSException {
1.3 +15 -10
jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java
Index: TestMessenger.java
===================================================================
RCS file:
/home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TestMessenger.java 2001/08/28 22:38:29 1.2
+++ TestMessenger.java 2001/08/30 12:18:06 1.3
@@ -5,7 +5,7 @@
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
- * $Id: TestMessenger.java,v 1.2 2001/08/28 22:38:29 jstrachan Exp $
+ * $Id: TestMessenger.java,v 1.3 2001/08/30 12:18:06 jstrachan Exp $
*/
package org.apache.commons.messenger;
@@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.List;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.TextMessage;
@@ -23,15 +24,15 @@
/** Test harness for Messenger
*
* @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a>
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class TestMessenger extends TestCase {
protected static boolean verbose = true;
protected List failures = new ArrayList();
- protected String topic = "jms/Topic";
- protected String queue = "jms/Queue";
+ protected String topicName = "jms/Topic";
+ protected String queueName = "jms/Queue";
protected String topicMessageText = "This is the text of a topic message";
protected String queueMessageText = "This is the text of a queue message";
@@ -61,8 +62,9 @@
public void testSendTopic() throws Exception {
Messenger messenger = MessengerManager.get( "topic" );
+ Destination topic = messenger.getDestination( topicName );
- clearSubject( messenger, topic );
+ flushDestination( messenger, topic );
Thread thread = new Thread() {
public void run() {
@@ -95,8 +97,9 @@
public void testSendQueue() throws Exception {
Messenger messenger = MessengerManager.get( "queue" );
+ Destination queue = messenger.getDestination( queueName );
- clearSubject( messenger, queue );
+ flushDestination( messenger, queue );
Thread thread = new Thread() {
public void run() {
@@ -130,12 +133,12 @@
protected void setUp() throws Exception {
}
- protected void clearSubject(Messenger messenger, String subject) throws
Exception {
- log( "Clearing messenger subject: " + subject );
+ protected void flushDestination(Messenger messenger, Destination destination)
throws Exception {
+ log( "Clearing messenger destination: " + destination );
// lets remove any existing messages
while (true) {
- Message m = messenger.receiveNoWait( topic );
+ Message m = messenger.receiveNoWait( destination );
if ( m != null ) {
log( "Ignoring message: " + m );
}
@@ -144,11 +147,12 @@
}
}
- log( "Cleared messenger subject: " + subject );
+ log( "Cleared messenger destination: " + destination );
}
protected void receiveTopicMessage() throws Exception {
Messenger messenger = MessengerManager.get( "topic" );
+ Destination topic = messenger.getDestination( topicName );
log( "Calling receive() on topic" );
@@ -162,6 +166,7 @@
protected void receiveQueueMessage() throws Exception {
Messenger messenger = MessengerManager.get( "queue" );
+ Destination queue = messenger.getDestination( queueName );
log( "Calling receive() on queue" );