User: hiram
Date: 00/12/23 07:48:18
Modified: src/java/org/spydermq SpyQueueReceiver.java
SpyConnection.java SpyConnectionConsumer.java
SpyQueueSession.java SpyDistributedConnection.java
SpyMessageConsumer.java SpyQueueConnection.java
SpyQueueBrowser.java SpyConsumer.java
SpySession.java SpyMessage.java
SpyTopicSession.java SpyTopicSubscriber.java
SpyXAResource.java SpyXAResourceManager.java
TransactionRequest.java
Added: src/java/org/spydermq AcknowledgementRequest.java
ReceiveRequest.java Subscription.java
Removed: src/java/org/spydermq SpyAcknowledgementItem.java
Log:
These changes were done to add the following features:
The selector is now evaluated at the server side.
The infrastructure has been laid for durable topic subscriptions.
The QueueBrowser has been implemented.
Queues now can have a Selector.
Revision Changes Path
1.8 +19 -33 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyQueueReceiver.java 2000/12/21 22:33:55 1.7
+++ SpyQueueReceiver.java 2000/12/23 15:48:14 1.8
@@ -18,24 +18,14 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
// Attributes ----------------------------------------------------
//The queue I registered
private Queue queue;
- //Mode of this QueueReceiver
- boolean listening;
- // Constructor ---------------------------------------------------
-
- SpyQueueReceiver(SpyQueueSession session, Queue queue) {
- super(session, (SpyQueue) queue);
- this.queue = queue;
- listening = false;
- }
-
// Public --------------------------------------------------------
public Queue getQueue() throws JMSException {
@@ -45,26 +35,8 @@
return queue;
}
- public void close() throws JMSException {
-
- synchronized (messages) {
- if (closed)
- return;
-
- if (queue != null)
- session.removeConsumer(queue, this);
- setListening(false);
- if (waitInReceive && messageListener == null) {
- //A consumer could be waiting in receive()
- messages.notify();
- }
-
- closed = true;
- }
- }
-
public void setMessageListener(MessageListener listener) throws JMSException {
super.setMessageListener(listener);
setListening(listener != null);
@@ -72,15 +44,29 @@
//---
void setListening(boolean newvalue) throws JMSException {
- if (newvalue == listening)
+ if (newvalue == subscription.listening)
return;
- listening = newvalue;
+ subscription.listening = newvalue;
if (queue != null)
- session.getConnection().listenerChange(queue);
+ session.connection.listenerChange(subscription.subscriptionId,
subscription.listening);
}
+
+ // Constructor ---------------------------------------------------
+
+ SpyQueueReceiver(SpyQueueSession session, Queue queue, String selector) {
+ super(session, (SpyQueue) queue);
+ this.queue = queue;
+
+ subscription.durableSubscriptionName = null;
+ subscription.messageSelector = selector;
+ subscription.durableSubscriptionName = null;
+ subscription.noLocal = false;
+ }
+
+
- public boolean isListening() {
- return listening;
+ public Subscription getSubscription() {
+ return subscription;
}
}
1.19 +141 -173 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- SpyConnection.java 2000/12/21 22:33:55 1.18
+++ SpyConnection.java 2000/12/23 15:48:14 1.19
@@ -27,13 +27,15 @@
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
+import java.util.LinkedList;
+
/**
* This class implements javax.jms.Connection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.18 $
+ * @version $Revision: 1.19 $
*/
public class SpyConnection implements Connection, Serializable {
@@ -47,10 +49,15 @@
protected String clientID;
//the distributed object which receives messages from the JMS server
protected SpyDistributedConnection distributedConnection;
- //HashMap of ConnectionQueue by Destination
- public HashMap destinations;
+
//LinkedList of all created sessions by this connection
HashSet createdSessions;
+ // Numbers subscriptions
+ int subscriptionCounter = Integer.MIN_VALUE;
+ //Maps a destination to a LinkedList of Subscriptions
+ public HashMap destinationSubscriptions = new HashMap();
+ //Maps a a subsction id to a Subscription
+ public HashMap subscriptions = new HashMap();
//Last message ID returned
private int lastMessageID;
//Is the connection stopped ?
@@ -64,6 +71,7 @@
// Used to control tranactions
SpyXAResourceManager spyXAResourceManager;
+
//////////////////////////////////////////////////////////////
// Constructors
//////////////////////////////////////////////////////////////
@@ -71,7 +79,6 @@
SpyConnection(DistributedJMSServer theServer, String cID, String crCN) throws
JMSException {
//Set the attributes
provider = theServer;
- destinations = new HashMap();
createdSessions = new HashSet();
distributedConnection = null;
closed = false;
@@ -145,20 +152,11 @@
if (!modeStop)
return;
modeStop = false;
-
- Iterator i = destinations.keySet().iterator();
- while (i.hasNext()) {
- Destination d = (Destination) i.next();
- ConsumerSet ci = (ConsumerSet) destinations.get(d);
-
- if (ci.getLasListeningState()) {
- try {
-
provider.connectionListening(distributedConnection, true, d);
- } catch (Exception e) {
- failureHandler(e, "Cannot contact the JMS
server");
- }
- }
+ try {
+ provider.setEnabled(distributedConnection, true);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot enable the connection with the JMS
provider");
}
changeModeStop(modeStop);
@@ -174,19 +172,10 @@
return;
modeStop = true;
- Iterator i = destinations.keySet().iterator();
- while (i.hasNext()) {
- Destination d = (Destination) i.next();
- ConsumerSet ci = (ConsumerSet) destinations.get(d);
-
- if (ci.getLasListeningState()) {
- try {
-
provider.connectionListening(distributedConnection, false, d);
- } catch (Exception e) {
- failureHandler(e, "Cannot contact the JMS
server");
- }
- }
-
+ try {
+ provider.setEnabled(distributedConnection, false);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot disable the connection with the JMS
provider");
}
changeModeStop(modeStop);
@@ -240,10 +229,8 @@
try {
//Remove it from the destinations list
- synchronized (destinations) {
- HashMap newMap = (HashMap) destinations.clone();
- newMap.remove(dest);
- destinations = newMap;
+ synchronized (subscriptions) {
+ destinationSubscriptions.remove(dest);
}
//Notify its sessions that this TemporaryDestination is going
to be deleted()
@@ -326,85 +313,11 @@
}
}
- // The ConsumerSet inner class is used by:
- //
- // addConsumer()
- // removeConsumer()
- // getConsumers()
- // listenerChange()
- // pickListeningConsumer()
- //
- class ConsumerSet extends HashSet {
- boolean lasListeningState = false;
-
- boolean getLasListeningState() {
- return lasListeningState;
- }
-
- boolean listenStateChanged() {
- boolean t = false;
-
- Iterator iter = iterator();
- while (iter.hasNext()) {
- SpyConsumer c = (SpyConsumer) iter.next();
- if (c.isListening()) {
- t = true;
- break;
- }
- }
-
- if (t == lasListeningState) {
- return false;
- }
-
- lasListeningState = t;
- return true;
- }
- }
-
- /**
- * Called whenever a consumer changes his listening state on a destination.
- * We see if the consumer change, changed the overall listening state for the
destination.
- * Creation date: (11/16/2000 2:20:22 PM)
- * @return org.spydermq.distributed.interfaces.DistributedJMSServer
- */
- public void listenerChange(Destination d) throws JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
- if (distributedConnection == null)
- createReceiver();
- ConsumerSet ci = (ConsumerSet) destinations.get(d);
-
- if( ci == null )
- return;
- if (ci.listenStateChanged()) {
- try {
- if (ci.getLasListeningState()) {
-
provider.connectionListening(distributedConnection, true, d);
- } else {
-
provider.connectionListening(distributedConnection, false, d);
- }
- } catch (Exception e) {
- failureHandler(e, "Cannot contact the JMS server");
- }
- }
- }
- /**
- * @return org.spydermq.distributed.interfaces.DistributedJMSServer
- */
- SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
- try {
- return provider.queueReceive(distributedConnection, queue,
wait);
- } catch (Exception e) {
- failureHandler(e, "Cannot create a ConnectionReceiver");
- return null;
- }
- }
////////////////////////////////////////////////////////////////////
// Protected
@@ -428,14 +341,7 @@
}
}
- // used to acknowledge a message
- protected void send(SpyAcknowledgementItem item) throws JMSException {
- try {
- provider.acknowledge(distributedConnection, item);
- } catch (Exception e) {
- failureHandler(e, "Cannot acknowlege a message.");
- }
- }
+
//Send a message to the provider
void sendToServer(SpyMessage mes) throws JMSException {
@@ -464,115 +370,177 @@
}
+
//A new Consumer has been created for the Destination dest
- void addConsumer(Destination dest, SpyConsumer consumer) throws JMSException {
+ void addConsumer(SpyConsumer consumer) throws JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
if (distributedConnection == null)
createReceiver();
- Log.log("Connection: addConsumer(dest=" + dest.toString() + ")");
+ Subscription req = consumer.getSubscription();
+ req.subscriptionId = subscriptionCounter++;
+ req.dc = distributedConnection;
- try {
+ Log.log("Connection: addConsumer(dest=" + req.destination.toString() +
")");
- synchronized (destinations) {
+
+ try {
- ConsumerSet consumerSet = (ConsumerSet)
destinations.get(dest);
+ synchronized (subscriptions ) {
- if (consumerSet == null) {
- consumerSet = new ConsumerSet();
- consumerSet.add(consumer);
- HashMap newDestinations = (HashMap)
destinations.clone();
- newDestinations.put(dest, consumerSet);
- destinations = newDestinations;
- provider.subscribe(distributedConnection,
dest);
- } else {
- consumerSet.add(consumer);
+ subscriptions.put(new Integer(req.subscriptionId),
consumer );
+
+ LinkedList ll =
(LinkedList)destinationSubscriptions.get( req.destination );
+ if( ll == null ) {
+ ll = new LinkedList();
+ destinationSubscriptions.put(req.destination,
ll );
}
+
+ ll.add( consumer );
}
+ provider.subscribe(distributedConnection, req);
+
} catch (Exception e) {
failureHandler(e, "Cannot subscribe to this Destination");
}
}
- //Gets all the consumers subscribed to a destination
- public SpyConsumer[] getConsumers(Destination dest) throws JMSException {
+ /**
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ SpyMessage[] browse(Queue queue, String selector) throws JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
- if (distributedConnection == null)
- createReceiver();
+ try {
+ return provider.browse(distributedConnection, queue, selector);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot browse the Queue.");
+ return null;
+ }
+ }
- synchronized (destinations) {
- ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
- if (consumerSet == null || consumerSet.size() == 0)
- return null;
+ //Gets the first consumer that is listening to a destination.
+ public void deliver(ReceiveRequest requests[]) throws JMSException {
- SpyConsumer rc[] = new SpyConsumer[consumerSet.size()];
- return (SpyConsumer[]) consumerSet.toArray(rc);
- }
+ HashSet consumersUsed = new HashSet();
+
+ for( int i=0; i < requests.length; i++ ) {
+
+ if( requests[i].subscriptionId != null ) {
+
+ SpyConsumer consumer = (SpyConsumer)subscriptions.get(
requests[i].subscriptionId );
+ if( consumer == null ) {
+ send(
requests[i].message.getAcknowledgementRequest(false) );
+ Log.log("WARNING: NACK issued due to non
existent subscription");
+ continue;
+ }
+ requests[i].message.shouldAck = true;
+ consumer.addMessage(requests[i].message);
+ consumersUsed.add(consumer);
+
+ } else {
+
+ LinkedList ll =
(LinkedList)destinationSubscriptions.get( requests[i].message.getJMSDestination() );
+ if( ll == null ) {
+ Log.log("WARNING: Received message but had no
subscribers for it");
+ continue;
+ }
+
+ requests[i].message.shouldAck = false;
+ Iterator iter = ll.iterator();
+ while( iter.hasNext() ) {
+
+ SpyConsumer consumer =
(SpyConsumer)iter.next();
+ consumer.addMessage(requests[i].message);
+ consumersUsed.add(consumer);
+
+ }
+ }
+ }
+
+ Iterator iter = consumersUsed.iterator();
+ while( iter.hasNext() ) {
+ SpyConsumer consumer = (SpyConsumer)iter.next();
+ consumer.processMessages();
+ }
}
- //Gets the first consumer that is listening to a destination.
- public SpyConsumer pickListeningConsumer(Destination dest) throws JMSException
{
+ /**
+ * Called whenever a consumer changes his listening state on a destination.
+ * We see if the consumer change, changed the overall listening state for the
destination.
+ * Creation date: (11/16/2000 2:20:22 PM)
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ public void listenerChange(int subscriptionId, boolean state) throws
JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
if (distributedConnection == null)
createReceiver();
-
- synchronized (destinations) {
-
- ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
- if (consumerSet == null || consumerSet.size() == 0) {
- return null;
- } else {
- Iterator i = consumerSet.iterator();
- while (i.hasNext()) {
- SpyConsumer c = (SpyConsumer) i.next();
- if (c.isListening() || c.isReceiving()) {
- return c;
- }
- }
- }
+ try {
+ provider.listenerChange(distributedConnection, subscriptionId,
state);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot contact the JMS server");
}
- return null;
+ }
+ /**
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ SpyMessage receive(Subscription sub, long wait) throws JMSException {
+
+ try {
+ return provider.receive(distributedConnection,
sub.subscriptionId, wait);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot create a ConnectionReceiver");
+ return null;
+ }
}
//A consumer does not need to recieve the messages from a Destination
- void removeConsumer(Destination dest, SpyConsumer who) throws JMSException {
+ void removeConsumer(SpyConsumer consumer) throws JMSException {
if (distributedConnection == null)
createReceiver();
- Log.log("Connection: removeSession(dest=" + dest.toString() + ")");
+ Subscription req = consumer.getSubscription();
+ Log.log("Connection: removeSession(dest=" + req.destination + ")");
try {
-
- synchronized (destinations) {
- ConsumerSet consumerSet = (ConsumerSet)
destinations.get(dest);
- if (consumerSet == null)
- throw new RuntimeException("Destination does
not have any consumers.");
-
- consumerSet.remove(who);
+ provider.unsubscribe(distributedConnection,
req.subscriptionId);
+
+ synchronized (subscriptions ) {
- if ( consumerSet.isEmpty() ) {
- destinations.remove(dest);
- provider.unsubscribe(distributedConnection,
dest);
+ subscriptions.remove(new Integer(req.subscriptionId));
+
+ LinkedList ll =
(LinkedList)destinationSubscriptions.get( req.destination );
+ if( ll != null ) {
+ ll.remove( req );
+ if( ll.size() == 0 ) {
+
destinationSubscriptions.remove(req.destination);
+ }
}
-
+
}
} catch (Exception e) {
failureHandler(e, "Cannot unsubscribe to this destination");
}
+ }
+
+ // used to acknowledge a message
+ protected void send(AcknowledgementRequest item) throws JMSException {
+ try {
+ provider.acknowledge(distributedConnection, item);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot acknowlege a message.");
+ }
}
}
1.2 +15 -2 spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyConnectionConsumer.java 2000/12/21 22:33:56 1.1
+++ SpyConnectionConsumer.java 2000/12/23 15:48:15 1.2
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer {
@@ -35,6 +35,8 @@
LinkedList queue = new LinkedList();
// Is the ConnectionConsumer closed?
boolean closed;
+ // The subscription info the consumer
+ Subscription subscription;
/**
* SpyConnectionConsumer constructor comment.
@@ -47,20 +49,27 @@
this.serverSessionPool = serverSessionPool;
this.maxMessages = maxMessages;
- connection.addConsumer(destination, this);
+ subscription.destination = (SpyDestination)destination;
+ subscription.messageSelector = messageSelector;
+ subscription.durableSubscriptionName = null;
+ subscription.noLocal = false;
+ connection.addConsumer(this);
+
}
public void addMessage(SpyMessage mes) throws JMSException {
queue.addLast(mes);
}
+
+
/**
* close method comment.
*/
public void close() throws javax.jms.JMSException {
- connection.removeConsumer(destination, this);
+ connection.removeConsumer(this);
closed = true;
}
@@ -109,5 +118,9 @@
}
+ }
+
+ public Subscription getSubscription() {
+ return subscription;
}
}
1.10 +11 -14 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyQueueSession.java 2000/12/21 22:33:55 1.9
+++ SpyQueueSession.java 2000/12/23 15:48:15 1.10
@@ -16,21 +16,20 @@
import javax.jms.QueueBrowser;
import javax.jms.DeliveryMode;
import javax.jms.XAQueueSession;
+import javax.jms.MessageListener;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
-import javax.jms.MessageListener;
-
/**
* This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyQueueSession
extends SpySession
@@ -52,18 +51,14 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- if (closed) throw new IllegalStateException("The session is closed");
-
- //Not yet implemented
- return null;
+ if (closed) throw new IllegalStateException("The session is closed");
+ return new SpyQueueBrowser(this, queue, null);
}
public QueueBrowser createBrowser(Queue queue,String messageSelector) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- //Not yet implemented
- return createBrowser(queue);
+ return new SpyQueueBrowser(this, queue, null);
}
public Queue createQueue(String queueName) throws JMSException
@@ -77,7 +72,7 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
+ SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,null);
addConsumer(queue,receiver);
return receiver;
@@ -86,9 +81,11 @@
public QueueReceiver createReceiver(Queue queue, String messageSelector)
throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- //Not yet implemented
- return createReceiver(queue);
+
+ SpyQueueReceiver receiver=new
SpyQueueReceiver(this,queue,messageSelector);
+ addConsumer(queue,receiver);
+
+ return receiver;
}
public QueueSender createSender(Queue queue) throws JMSException
@@ -116,7 +113,7 @@
{
super.setMessageListener(listener);
- sessionConsumer = new SpyQueueReceiver(this, null);
+ sessionConsumer = new SpyQueueReceiver(this, null,null);
}
}
1.10 +7 -1 spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
Index: SpyDistributedConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyDistributedConnection.java 2000/12/12 20:58:25 1.9
+++ SpyDistributedConnection.java 2000/12/23 15:48:15 1.10
@@ -8,6 +8,7 @@
import java.util.HashMap;
import java.io.Serializable;
+
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
@@ -15,8 +16,9 @@
* This class is the broker point of view on a SpyConnection (it contains a
ConnectionReceiver)
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyDistributedConnection
implements Serializable
@@ -68,5 +70,9 @@
if (cr!=null && cr instanceof java.rmi.Remote) {
java.rmi.server.UnicastRemoteObject.unexportObject((java.rmi.Remote)cr, true);
}
+ }
+
+ public String toString() {
+ return "SpyDistributedConnection:"+clientID;
}
}
1.8 +59 -61 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyMessageConsumer.java 2000/12/21 22:33:55 1.7
+++ SpyMessageConsumer.java 2000/12/23 15:48:15 1.8
@@ -24,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
@@ -34,19 +34,16 @@
MessageListener messageListener;
//Am I closed ?
protected boolean closed;
- //Do I have a selector
- public Selector selector;
- //The message selector
- public String messageSelector;
+ // The subscription structure should be fill out by the decendent
+ Subscription subscription = new Subscription();
+
//List of Pending messages (not yet delivered)
LinkedList messages;
- //Is the consumer sleeping in a receive() ?
- boolean waitInReceive;
+
//The destination this consumer is getting messages from
SpyDestination destination;
- //Am I in noLocal mode ?
- boolean noLocal;
+
// Constructor ---------------------------------------------------
SpyMessageConsumer(SpySession s, SpyDestination dest) {
@@ -54,10 +51,7 @@
destination = dest;
messageListener = null;
closed = false;
- selector = null;
- messageSelector = null;
messages = new LinkedList();
- waitInReceive = false;
}
// Public --------------------------------------------------------
@@ -66,7 +60,7 @@
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
- return messageSelector;
+ return subscription.messageSelector;
}
public MessageListener getMessageListener() throws JMSException {
@@ -79,12 +73,11 @@
public void setMessageListener(MessageListener listener) throws JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
- if (waitInReceive)
+ if (subscription.receiving)
throw new JMSException("This MessageConsumer is waiting in
receive() !");
messageListener = listener;
- //session.run();
}
public Message receive() throws JMSException {
@@ -96,10 +89,12 @@
if (destination == null)
throw new JMSException("No assigned destination.");
- waitInReceive = true;
+ Log.log("Subscription="+subscription);
+ subscription.receiving = true;
+ Log.log("Subscription assignment finished");
- if (!isListening() && this instanceof SpyQueueReceiver)
- session.connection.queueReceive((SpyQueue) destination, 0);
+ if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null )
+ session.connection.receive(subscription, 0);
synchronized (messages) {
@@ -122,7 +117,7 @@
newE.setLinkedException(e);
throw newE;
} finally {
- waitInReceive = false;
+ subscription.receiving = false;
}
}
@@ -141,10 +136,10 @@
long endTime = System.currentTimeMillis() + timeOut;
- waitInReceive = true;
+ subscription.receiving = true;
- if (!isListening() && this instanceof SpyQueueReceiver)
- session.connection.queueReceive((SpyQueue) destination,
timeOut);
+ if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null )
+ session.connection.receive(subscription, timeOut);
synchronized (messages) {
@@ -177,7 +172,7 @@
newE.setLinkedException(e);
throw newE;
} finally {
- waitInReceive = false;
+ subscription.receiving = false;
}
}
@@ -191,14 +186,13 @@
if (destination == null)
throw new JMSException("No assigned destination.");
- waitInReceive = true;
+ subscription.receiving = true;
try {
-
- if (!isListening() && this instanceof SpyQueueReceiver) {
+ if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null ) {
if (session.modeStop)
return null;
- return session.connection.queueReceive((SpyQueue)
destination, -1);
+ return session.connection.receive(getSubscription(),
-1);
}
synchronized (messages) {
@@ -210,19 +204,31 @@
}
} finally {
- waitInReceive = false;
+ subscription.receiving = false;
}
}
- abstract public void close() throws JMSException;
-
- //Package protected - Not part of the spec
+ public void close() throws JMSException {
+
+ synchronized (messages) {
+ if (closed)
+ return;
- void setSelector(Selector selector, String messageSelector) {
- this.selector = selector;
- this.messageSelector = messageSelector;
+ if (destination != null)
+ session.removeConsumer(destination, this);
+
+ if ( subscription.receiving && messageListener == null) {
+ //A consumer could be waiting in receive()
+ messages.notify();
+ }
+
+ closed = true;
+ }
+
}
+
+
SpyMessage getMessage() {
synchronized (messages) {
@@ -239,28 +245,14 @@
continue;
}
- if (selector != null) {
- if (!selector.test(mes)) {
- Log.log("SessionQueue: I
dropped a message (selector)");
- continue;
- } else {
- Log.log("SessionQueue:
selector evaluates TRUE");
- }
- }
-
- if (noLocal &&
mes.producerClientId.equals(session.connection.clientID)) {
- Log.notice("SessionQueue: I dropped a
message (noLocal)");
- continue;
- }
-
//the SAME Message object is put in different
SessionQueues
//when we deliver it, we have to clone() it to
insure independance
SpyMessage message = mes.myClone();
- message.setSpySession(session);
+ message.session = session;
- if (session.transacted) {
+ if (message.shouldAck && session.transacted) {
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId,
message);
- } else if (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
+ } else if (message.shouldAck &&
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode ==
session.DUPS_OK_ACKNOWLEDGE) {
message.doAcknowledge();
}
@@ -279,7 +271,15 @@
public void addMessage(SpyMessage mes) throws JMSException {
synchronized (messages) {
//Add a message to the queue
- messages.addLast(mes);
+ if( subscription.accepts(mes, mes.shouldAck) ) {
+ messages.addLast(mes);
+ } else {
+ if( mes.shouldAck ) {
+ Log.log("WARNING: NACK issued. The
subscription did not accept the message");
+ session.connection.send(
mes.getAcknowledgementRequest(false) );
+ }
+ }
+
}
}
@@ -290,7 +290,7 @@
return false;
if (messageListener == null) {
- if (!waitInReceive) {
+ if (!subscription.receiving) {
// If no Listener and No reciver is waiting
for a message
// Then we neg ack the message back to the
server in the queue case.
@@ -300,9 +300,9 @@
while (mes == null) {
Log.log("Got unrequested
message, sending NACK for: " + mes);
- SpyAcknowledgementItem item =
new SpyAcknowledgementItem();
- item.jmsDestination =
mes.getJMSDestination();
- item.jmsMessageID =
mes.getJMSMessageID();
+ AcknowledgementRequest item =
new AcknowledgementRequest();
+ item.destination =
mes.getJMSDestination();
+ item.messageID =
mes.getJMSMessageID();
item.isAck = false;
session.connection.send(item);
@@ -334,13 +334,11 @@
return true;
}
- abstract public boolean isListening();
-
- public boolean isReceiving() {
- return waitInReceive;
- }
-
public void processMessages() throws JMSException {
session.mutex.notifyLock();
+ }
+
+ public Subscription getSubscription() {
+ return subscription;
}
}
1.4 +0 -0 spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
Index: SpyQueueConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueConnection.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyQueueConnection.java 2000/12/21 22:33:55 1.3
+++ SpyQueueConnection.java 2000/12/23 15:48:15 1.4
@@ -24,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyQueueConnection
extends SpyConnection
1.3 +29 -9 spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java
Index: SpyQueueBrowser.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueBrowser.java 2000/12/12 05:58:58 1.2
+++ SpyQueueBrowser.java 2000/12/23 15:48:15 1.3
@@ -9,42 +9,62 @@
import javax.jms.QueueBrowser;
import javax.jms.Queue;
import javax.jms.JMSException;
+
import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.Vector;
/**
* This class implements javax.jms.QueueBrowser
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueBrowser
implements QueueBrowser
{
+
+ boolean closed;
+ // The destination this browser will browse messages from
+ Queue destination;
+ // String Selector
+ String selector;
+ // The QueueSession this was created with
+ SpyQueueSession session;
- //Public
+ SpyQueueBrowser( SpyQueueSession session,Queue destination,String selector ) {
+ this.destination=destination;
+ this.session=session;
+ this.selector=selector;
+ }
public Queue getQueue() throws JMSException
{
- //Nor implemented yet
- return null;
+ return destination;
}
public String getMessageSelector() throws JMSException
{
- //Nor implemented yet
- return null;
+ return selector;
}
public Enumeration getEnumeration() throws JMSException
{
- //Nor implemented yet
- return null;
+ if( closed )
+ throw new JMSException("The QueueBrowser was closed");
+
+ SpyMessage data[] = session.connection.browse(destination, selector);
+ Vector v = new Vector( data.length );
+ for( int i=0; i < data.length; i++ ) {
+ v.addElement(data[i]);
+ }
+ return v.elements();
}
public void close() throws JMSException
{
- //Nor implemented yet
+ closed = true;
return;
}
}
1.2 +2 -4 spyderMQ/src/java/org/spydermq/SpyConsumer.java
Index: SpyConsumer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConsumer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyConsumer.java 2000/12/21 22:33:56 1.1
+++ SpyConsumer.java 2000/12/23 15:48:15 1.2
@@ -14,16 +14,14 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface SpyConsumer
{
// A ConnectionReceiver uses this method to load a Consumer with a message
public void addMessage(SpyMessage mes) throws JMSException;
- // This is used the Connection class (it maintains a list of consumers) to see
who is receiving messages
- public boolean isListening();
- // This is used the Connection class (it maintains a list of consumers) to see
who is receiving messages
- public boolean isReceiving();
// This is called by a ConnectionReceiver after it is finished loading
messages into the consumer.
public void processMessages() throws JMSException;
+ // This is used to know what type of messages the consumer wants
+ public Subscription getSubscription();
}
1.17 +5 -9 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SpySession.java 2000/12/21 22:33:55 1.16
+++ SpySession.java 2000/12/23 15:48:15 1.17
@@ -34,7 +34,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.16 $
+ * @version $Revision: 1.17 $
*/
abstract public class SpySession
implements Runnable, Session, XASession
@@ -48,7 +48,7 @@
//The messageListener for this session
private MessageListener messageListener;
//The connection object to which this session is linked
- protected SpyConnection connection;
+ public SpyConnection connection;
// This consumer is the consumer that receives messages for the MessageListener
// assigned to the session. The SpyConnectionConsumer delivers messages to him
SpyMessageConsumer sessionConsumer;
@@ -213,7 +213,7 @@
{
synchronized (runLock) {
- Log.log("SpySession: Message delivery started");
+ Log.log("SpySession: run()");
boolean done=false;
while (!done) {
@@ -256,8 +256,6 @@
}
}
-
- Log.log("SpySession: Message delivery ended");
}
public synchronized void close() throws JMSException
@@ -389,7 +387,7 @@
Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
synchronized (connection) {
- connection.removeConsumer(dest, who );
+ connection.removeConsumer( who );
}
consumers.remove( who );
@@ -405,14 +403,12 @@
consumers.add( who );
}
- connection.addConsumer(dest, who);
+ connection.addConsumer(who);
}
- public SpyConnection getConnection() {
- return connection;
- }
+
//called by a MessageProducer object which needs to publish a message
1.10 +32 -41 spyderMQ/src/java/org/spydermq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyMessage.java 2000/12/13 15:59:09 1.9
+++ SpyMessage.java 2000/12/23 15:48:15 1.10
@@ -6,7 +6,6 @@
*/
package org.spydermq;
-
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -23,7 +22,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyMessage
implements Serializable, Cloneable, Message, Comparable
@@ -39,30 +38,31 @@
//Header fields
//Set by send() method
public Destination jmsDestination=null;
- private int jmsDeliveryMode=-1;
- private long jmsExpiration=0;
- private int jmsPriority=-1;
- private String jmsMessageID=null;
- private long jmsTimeStamp=0;
+ public int jmsDeliveryMode=-1;
+ public long jmsExpiration=0;
+ public int jmsPriority=-1;
+ public String jmsMessageID=null;
+ public long jmsTimeStamp=0;
//Set by the client
- private boolean jmsCorrelationID=true;
- private String jmsCorrelationIDString=null;
- private byte[] jmsCorrelationIDbyte=null;
- private Destination jmsReplyTo=null;
- private String jmsType=null;
+ public boolean jmsCorrelationID=true;
+ public String jmsCorrelationIDString=null;
+ public byte[] jmsCorrelationIDbyte=null;
+ public Destination jmsReplyTo=null;
+ public String jmsType=null;
//Set by the provider
- private boolean jmsRedelivered=false;
+ public boolean jmsRedelivered=false;
//Properties
- private Hashtable prop;
- private boolean propReadWrite;
+ public Hashtable prop;
+ public boolean propReadWrite;
//Message body
- protected boolean msgReadOnly=false;
+ public boolean msgReadOnly=false;
//For noLocal to be able to tell if this was a locally produced message
public String producerClientId;
// Transient Attributes ------------------------------------------
//For acknowledgment (set on the client side)
- private transient SpySession spySession;
+ public transient SpySession session;
+ public transient boolean shouldAck;
//For ordering in the JMSServerQueue (set on the server side)
public transient long messageId;
@@ -72,7 +72,6 @@
{
prop=new Hashtable();
propReadWrite=true;
- spySession=null;
}
// Public --------------------------------------------------------
@@ -402,10 +401,10 @@
public void acknowledge() throws JMSException
{
- if (spySession==null)
+ if (session==null)
throw new JMSException("This message was not recieved from the
provider");
- if( spySession.acknowledgeMode == spySession.CLIENT_ACKNOWLEDGE )
+ if( session.acknowledgeMode == session.CLIENT_ACKNOWLEDGE )
doAcknowledge();
}
@@ -451,39 +450,31 @@
return 1;
}
return (int)(messageId - sm.messageId);
- }
+ }
public void doAcknowledge() throws JMSException
{
-
- SpyAcknowledgementItem item = new SpyAcknowledgementItem();
- item.jmsDestination = jmsDestination;
- item.jmsMessageID = jmsMessageID;
- item.isAck = true;
-
- spySession.getConnection().send(item);
+ if( shouldAck )
+ session.connection.send(getAcknowledgementRequest(true));
}
public void doNegAcknowledge() throws JMSException
{
- SpyAcknowledgementItem item = new SpyAcknowledgementItem();
- item.jmsDestination = jmsDestination;
- item.jmsMessageID = jmsMessageID;
- item.isAck = false;
-
- spySession.getConnection().send(item);
+ if( shouldAck )
+ session.connection.send(getAcknowledgementRequest(false));
}
+ public AcknowledgementRequest getAcknowledgementRequest(boolean isAck) throws
JMSException
+ {
- public SpySession getSpySession() {
- return spySession;
- }
-
-
- public void setSpySession(SpySession newSpySession) {
- spySession = newSpySession;
+ AcknowledgementRequest item = new AcknowledgementRequest();
+ item.destination = jmsDestination;
+ item.messageID = jmsMessageID;
+ item.isAck = isAck;
+ return item;
+
}
}
1.12 +14 -16 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- SpyTopicSession.java 2000/12/21 22:33:55 1.11
+++ SpyTopicSession.java 2000/12/23 15:48:15 1.12
@@ -13,6 +13,7 @@
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
+import javax.jms.MessageListener;
import javax.jms.XATopicSession;
import java.util.Collection;
@@ -26,16 +27,13 @@
import org.spydermq.selectors.Selector;
import org.spydermq.Log;
-
-import javax.jms.MessageListener;
-
/**
* This class implements javax.jms.TopicSession and javax.jms.XATopicSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class SpyTopicSession
extends SpySession
@@ -74,13 +72,8 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, null);
addConsumer(topic,sub);
-
- if (messageSelector!=null) {
- Selector selector=new Selector(messageSelector);
- sub.setSelector(selector,messageSelector);
- }
return sub;
}
@@ -88,17 +81,22 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- //Not yet implemented
- return createSubscriber(topic);
+
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null,
name);
+ addConsumer(topic,sub);
+
+ return sub;
+
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- //Not yet implemented
- return createSubscriber(topic);
+
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, name);
+ addConsumer(topic,sub);
+
+ return sub;
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
@@ -130,7 +128,7 @@
{
super.setMessageListener(listener);
- sessionConsumer = new SpyTopicSubscriber(this, null, false);
+ sessionConsumer = new SpyTopicSubscriber(this, null, false,null,null);
}
}
1.9 +10 -36 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyTopicSubscriber.java 2000/12/21 22:33:56 1.8
+++ SpyTopicSubscriber.java 2000/12/23 15:48:16 1.9
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -31,16 +31,6 @@
//The topic I registered
private Topic topic;
-
- // Constructor ---------------------------------------------------
-
- SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal)
- {
- super(session, (SpyTopic)topic);
- this.topic=topic;
- this.noLocal=noLocal;
- }
-
// Public --------------------------------------------------------
public Topic getTopic() throws JMSException
@@ -52,35 +42,19 @@
public boolean getNoLocal() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- return noLocal;
+ return subscription.noLocal;
}
- //Overrides MessageConsumer
-
- public void close() throws JMSException
+ // Constructor ---------------------------------------------------
+
+ SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String
selector, String durableSubscriptionName)
{
-
- synchronized (messages) {
- if (closed)
- return;
+ super(session, (SpyTopic)topic);
+ this.topic=topic;
- if (topic != null)
- session.removeConsumer(topic, this);
-
- if (waitInReceive && messageListener == null) {
- //A consumer could be waiting in receive()
- messages.notify();
- }
-
- closed = true;
- }
-
- }
-
- /**
- * A topic is allways accepting messages from a destination.
- */
- public boolean isListening() {
- return true;
+ subscription.destination = (SpyDestination)topic;
+ subscription.messageSelector = selector;
+ subscription.durableSubscriptionName = durableSubscriptionName;
+ subscription.noLocal = noLocal;
}
}
1.3 +0 -0 spyderMQ/src/java/org/spydermq/SpyXAResource.java
Index: SpyXAResource.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResource.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyXAResource.java 2000/12/21 22:33:56 1.2
+++ SpyXAResource.java 2000/12/23 15:48:16 1.3
@@ -16,7 +16,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyXAResource implements XAResource {
1.2 +9 -9 spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java
Index: SpyXAResourceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyXAResourceManager.java 2000/12/19 06:43:32 1.1
+++ SpyXAResourceManager.java 2000/12/23 15:48:16 1.2
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyXAResourceManager implements java.io.Serializable {
@@ -65,9 +65,9 @@
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new JMSException("Invalid transaction id.");
- SpyAcknowledgementItem item = new SpyAcknowledgementItem();
- item.jmsDestination = msg.getJMSDestination();
- item.jmsMessageID = msg.getJMSMessageID();
+ AcknowledgementRequest item = new AcknowledgementRequest();
+ item.destination = msg.getJMSDestination();
+ item.messageID = msg.getJMSMessageID();
item.isAck = true;
state.ackedMessages.addLast(item);
}
@@ -93,8 +93,8 @@
transaction.messages = job;
}
if (state.ackedMessages.size() != 0) {
- SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
- job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
+ AcknowledgementRequest job[] = new
AcknowledgementRequest[state.ackedMessages.size()];
+ job = (AcknowledgementRequest[])
state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
@@ -134,8 +134,8 @@
transaction.messages = job;
}
if (state.ackedMessages.size() != 0) {
- SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
- job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
+ AcknowledgementRequest job[] = new
AcknowledgementRequest[state.ackedMessages.size()];
+ job = (AcknowledgementRequest[])
state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
@@ -158,8 +158,8 @@
transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
transaction.xid = null;
if (state.ackedMessages.size() != 0) {
- SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
- job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
+ AcknowledgementRequest job[] = new
AcknowledgementRequest[state.ackedMessages.size()];
+ job = (AcknowledgementRequest[])
state.ackedMessages.toArray(job);
transaction.acks = job;
//Neg Acknowlege all consumed messages
for (int i = 0; i < transaction.acks.length; i++) {
1.2 +1 -1 spyderMQ/src/java/org/spydermq/TransactionRequest.java
Index: TransactionRequest.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/TransactionRequest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TransactionRequest.java 2000/12/19 06:43:33 1.1
+++ TransactionRequest.java 2000/12/23 15:48:16 1.2
@@ -13,7 +13,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class TransactionRequest
implements Serializable
@@ -34,6 +34,6 @@
public SpyMessage[] messages;
// messages acknowleged in the transaction
- public SpyAcknowledgementItem[] acks;
+ public AcknowledgementRequest[] acks;
}
1.1 spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java
Index: AcknowledgementRequest.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.io.Serializable;
import javax.jms.Destination;
/**
* Used to Acknowledge sent messages.
*
* This class holds the minimum abount of information needed to
* identify a message to the JMSServer.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class AcknowledgementRequest
implements java.io.Serializable
{
public boolean isAck;
public Destination destination=null;
public String messageID=null;
transient public int subscriberId;
public boolean equals(Object o) {
if( !(o instanceof AcknowledgementRequest ) )
return false;
return messageID.equals(((AcknowledgementRequest)o).messageID) &&
destination.equals(((AcknowledgementRequest)o).destination);
}
public int hashCode() {
return messageID.hashCode();
}
}
1.1 spyderMQ/src/java/org/spydermq/ReceiveRequest.java
Index: ReceiveRequest.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.io.Serializable;
/**
* This class contians all the data needed to perform a JMS transaction
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ReceiveRequest
implements Serializable
{
// The message
public SpyMessage message;
// Is this an exlusive message? Then subscriptionId != null
public Integer subscriptionId;
}
1.1 spyderMQ/src/java/org/spydermq/Subscription.java
Index: Subscription.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.io.Serializable;
import org.spydermq.selectors.Selector;
/**
* This class contians all the data needed to for a the provider to
* to determine if a message can be routed to a consumer.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class Subscription
implements Serializable
{
// This gets set to a unique value at the SpyConnection
public int subscriptionId;
// the queue we want to subscribe to
public SpyDestination destination;
// the selector which will filter out messages
public String messageSelector;
// this is not null if we want a durable subscription
public String durableSubscriptionName;
// Topics might not want locally produced messages
public boolean noLocal;
// Transient Values
private transient Selector selector;
public transient SpyDistributedConnection dc;
public transient boolean listening;
public transient boolean receiving;
// Determines the consumer would accept the message.
public boolean accepts( SpyMessage message, boolean exclusive ) throws
javax.jms.JMSException {
if( messageSelector != null ) {
if( selector==null )
selector = new Selector(messageSelector);
if( !selector.test(message) )
return false;
}
if ( message.getJMSDestination() instanceof SpyTopic ) {
// In the Topic case we allways deliver unless we have a
noLocal
if( noLocal && message.producerClientId.equals(
dc.getClientID() ) )
return false;
// But if the subscriber is durable, then it acts like a Queue
if( durableSubscriptionName != null ) {
if( !exclusive )
return false;
if( !listening || !receiving )
return false;
}
} else {
if( !exclusive )
return false;
// In the Queue case we only deliver if it is currently
// has a listner or is receiving
if( !listening && !receiving )
return false;
}
return true;
}
}