User: norbert
Date: 00/05/31 11:06:50
Added: src/java/org/spydermq ConnectionQueue.java JMSServer.java
JMSServerQueue.java Log.java
NoReceiverException.java SessionQueue.java
SpyBytesMessage.java SpyConnection.java
SpyConnectionMetaData.java SpyDestination.java
SpyDistributedConnection.java
SpyEncapsulatedMessage.java SpyMapMessage.java
SpyMessage.java SpyMessageConsumer.java
SpyMessageProducer.java SpyObjectMessage.java
SpyQueue.java SpyQueueBrowser.java
SpyQueueConnection.java SpyQueueReceiver.java
SpyQueueSender.java SpyQueueSession.java
SpySession.java SpyStreamMessage.java
SpyTemporaryQueue.java SpyTemporaryTopic.java
SpyTextMessage.java SpyTopic.java
SpyTopicConnection.java SpyTopicPublisher.java
SpyTopicSession.java SpyTopicSubscriber.java
Log:
Change the directory name
Revision Changes Path
1.1 spyderMQ/src/java/org/spydermq/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
import javax.jms.Destination;
import java.util.HashSet;
import java.util.Iterator;
/**
* This class holds the subscribed sessions.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ConnectionQueue
{
// Attributes ----------------------------------------------------
//My destination
Destination destination;
//the SpySessions linked to this queue
public HashSet subscribers;
//Number of listening sessions
public int NumListeningSessions;
//My SpyConnection
SpyConnection connection;
// Constructor ---------------------------------------------------
ConnectionQueue(Destination destination,SpyConnection connection)
{
subscribers=new HashSet();
this.connection=connection;
this.destination=destination;
NumListeningSessions=0;
}
// Package protected ---------------------------------------------
synchronized void addSession(SpySession session)
{
HashSet newSet=(HashSet)subscribers.clone();
newSet.add(session);
subscribers=newSet;
}
synchronized boolean removeSession(SpySession session)
{
HashSet newSet=(HashSet)subscribers.clone();
newSet.remove(session);
subscribers=newSet;
return subscribers.size()==0;
}
synchronized void changeNumListening(int val) throws JMSException
{
NumListeningSessions+=val;
Log.log("ConnectionQueue:
changeNumListening(sessions="+NumListeningSessions+")");
try {
if (val==-1&&NumListeningSessions==0) {
connection.provider.connectionListening(false,destination,connection.distributedConnection);
} else if (val==1&&NumListeningSessions==1) {
connection.provider.connectionListening(true,destination,connection.distributedConnection);
}
} catch (Exception e) {
connection.failureHandler(e,"Cannot contact the JMS server");
}
}
}
1.1 spyderMQ/src/java/org/spydermq/JMSServer.java
Index: JMSServer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import javax.jms.Queue;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Iterator;
import org.spydermq.security.SecurityManager;
/**
* This class implements the JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSServer
implements Runnable
{
// Constants -----------------------------------------------------
//number of threads in the pool (TO DO: this value should be dynamic)
final int NB_THREADS=1;
// Attributes ----------------------------------------------------
//messages pending for a Destination ( HashMap of JMSServerQueue objects )
private HashMap messageQueue;
//list of tasks pending ( linked list of JMSServerQueue objects )
LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
//last id given to a client
private int lastID;
//last id given to a temporary topic
private int lastTemporaryTopic;
//last id given to a temporary queue
private int lastTemporaryQueue;
//The security manager
SecurityManager securityManager;
// Constructor ---------------------------------------------------
public JMSServer(SecurityManager securityManager)
{
taskQueue=new LinkedList();
messageQueue=new HashMap();
for(int i=0;i<NB_THREADS;i++)
{
Thread oneThread=new Thread(this);
oneThread.setDaemon(true);
oneThread.setName(new Integer(i).toString());
oneThread.start();
}
lastID=1;
lastTemporaryTopic=1;
this.securityManager=securityManager;
}
// Public --------------------------------------------------------
//This is a correct threading system, but this is not ideal...
//We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
public void run()
{
while (true)
{
JMSServerQueue queue=null;
//Wait (and sleep) until it can find something to do
synchronized (taskQueue)
{
while (queue==null) {
// size() is O(1) in LinkedList...
int size=taskQueue.size();
if (size!=0) {
queue=(JMSServerQueue)taskQueue.removeFirst();
//One other thread can start working
on the task queue...
if (size>1) taskQueue.notify();
} else {
try {
Log.log("I'm going to bed...");
taskQueue.wait();
Log.log("I wake up");
} catch (InterruptedException e) {
}
}
}
}
//Ask the queue to do its job
queue.doMyJob();
}
}
// Administration calls
public SpyTopic newTopic(String name) throws JMSException
{
Log.notice("[JMSServer] new topic : "+name);
SpyTopic newTopic=new SpyTopic(name);
if (messageQueue.containsKey(newTopic)) throw new JMSException("This
topic already exists !");
JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newTopic,queue);
messageQueue=newMap;
}
return newTopic;
}
public SpyQueue newQueue(String name) throws JMSException
{
Log.notice("[JMSServer] new queue : "+name);
SpyQueue newQueue=new SpyQueue(name);
if (messageQueue.containsKey(newQueue)) throw new JMSException("This
queue already exists !");
JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newQueue,queue);
messageQueue=newMap;
}
return newQueue;
}
// -----------------------------------------
// Callbacks for the invocation layer ------
// -----------------------------------------
//Get a new ClientID for a connection
public String getID()
{
String ID=null;
while (true) {
try {
ID="ID"+(new Integer(lastID++).toString());
securityManager.addClientID(ID);
break;
} catch (Exception e) {
}
}
return ID;
}
//A connection has send a new message
public void newMessage(SpyMessage val[],String id) throws JMSException
{
if (val.length!=1) Log.notice("INCOMING: "+val.length+" messages from
"+id);
for(int i=0;i<val.length;i++) {
Log.notice("INCOMING: "+val[i]+" => "+val[i].jmsDestination);
JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val[i].jmsDestination);
if (queue==null) throw new JMSException("This destination does
not exist !");
//Add the message to the queue
queue.addMessage(val[i]);
}
}
//A connection object wants to subscribe to a Destination
public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
{
Log.log("Server:
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
if (queue==null) throw new JMSException("This destination does not
exist !");
queue.addSubscriber(dc);
}
public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
{
Log.log("Server:
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
if (queue==null) throw new JMSException("This destination does not
exist !");
queue.removeSubscriber(dc);
}
public synchronized SpyTopic createTopic(String name) throws JMSException
{
Log.log("createTopic("+name+")");
SpyTopic newTopic=new SpyTopic(name);
if (!messageQueue.containsKey(newTopic)) throw new JMSException("This
destination does not exist !");
return newTopic;
}
public synchronized SpyQueue createQueue(String name) throws JMSException
{
Log.log("createQueue("+name+")");
SpyQueue newQueue=new SpyQueue(name);
if (!messageQueue.containsKey(newQueue)) throw new JMSException("This
destination does not exist !");
return newQueue;
}
public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc)
{
SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
synchronized (messageQueue) {
JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(topic,queue);
messageQueue=newMap;
}
return topic;
}
public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc)
{
SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);
synchronized (messageQueue) {
JMSServerQueue sessionQueue=new
JMSServerQueue(newQueue,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(newQueue,sessionQueue);
messageQueue=newMap;
}
return newQueue;
}
//A connection is closing [error or notification]
public synchronized void connectionClosing(SpyDistributedConnection
dc,JMSServerQueue noCheck)
{
if (dc==null) return;
//unregister its clientID
if (dc.getClientID()!=null)
securityManager.removeID(dc.getClientID());
//Remove the connection from the subscribers list
synchronized (messageQueue) {
HashMap newMap=null;
Iterator i=messageQueue.values().iterator();
boolean modified=false; //don't waste memory
while (i.hasNext()) {
JMSServerQueue sq=(JMSServerQueue)i.next();
if (dc.equals(sq.temporaryDestination)) {
if (!modified)
newMap=(HashMap)messageQueue.clone();
newMap.remove(sq.destination);
modified=true;
} else {
if (sq==noCheck) continue;
sq.connectionClosing(dc);
}
}
if (modified) messageQueue=newMap;
}
}
public synchronized void deleteTemporaryDestination(SpyDestination dest)
{
Log.log("JMSServer: deleteDestination(dest="+dest.toString()+")");
synchronized (messageQueue) {
HashMap newMap=(HashMap)messageQueue.clone();
newMap.remove(dest);
messageQueue=newMap;
}
}
public void checkID(String ID) throws JMSException
{
securityManager.addClientID(ID);
}
public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
{
Log.log("JMSserver: queueReceiveNoWait(queue="+queue+")");
JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
if (serverQueue==null) throw new JMSException("This destination does
not exist !");
return serverQueue.queueReceiveNoWait();
}
public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
{
JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
if (serverQueue==null) throw new JMSException("This destination does
not exist !");
serverQueue.connectionListening(mode,dc);
}
}
1.1 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Destination;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
/**
* This class is a message queue which is stored (hashed by Destination) on the
JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSServerQueue
{
// Attributes ----------------------------------------------------
//the Destination of this queue
Destination destination;
//DistributedConnection objs that have "registered" to this Destination
private HashMap subscribers;
//List of Pending messages
private LinkedList messages;
//Is a thread already working on this queue ?
//You cannot start two threads on the same destination (correct order of msgs)
private boolean threadWorking;
// Am I already in the task queue ? It is useless to put many times the same
destination in the task queue.
private boolean alreadyInTaskQueue;
//If this is linked to a temporaryDestination,
temporaryDestination=DistributedConnection of the owner, otherwise it's null
SpyDistributedConnection temporaryDestination;
//The JMSServer object
private JMSServer server;
//Am I a queue or a topic
boolean isTopic;
//List of messages waiting for acknowledgment
private LinkedList messagesWaitingForAck;
//Nb of listeners for this Queue
int listeners;
// Constructor ---------------------------------------------------
JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer
server)
{
destination=dest;
subscribers=new HashMap();
messages=new LinkedList();
threadWorking=false;
alreadyInTaskQueue=false;
temporaryDestination=temporary;
this.server=server;
messagesWaitingForAck=new LinkedList();
isTopic=dest instanceof SpyTopic;
listeners=0;
}
// Package protected ---------------------------------------------
void addSubscriber(SpyDistributedConnection dc) throws JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
if
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new
JMSException("You cannot subscriber to this temporary destination");
subscribers.put(dc.getClientID(),dc);
}
}
void removeSubscriber(SpyDistributedConnection dc)
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
if (distributedConnection==null) return;
listeners-=distributedConnection.listeners;
subscribers.remove(dc);
}
}
void addMessage(SpyMessage mes) throws JMSException
{
//Add a message to the message list...
synchronized (messages)
{
//Add the message to the queue
//Get the priority
int pri=mes.getJMSPriority();
if (pri<=4) {
//normal priority message
messages.addLast(mes);
} else {
//expedited priority message
int size=messages.size();
int i=0;
for(;i<size;i++) {
if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
}
messages.add(i,mes);
}
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
if (!threadWorking) notifyWorkers();
} else {
if (listeners!=0&&!threadWorking) notifyWorkers();
}
}
}
//Clear the message queue
synchronized SpyMessage[] startWork()
{
if (threadWorking) throw new RuntimeException("One thread is already
working !");
synchronized (messages) {
SpyMessage[] mes=new SpyMessage[messages.size()];
mes=(SpyMessage[])messages.toArray(mes);
messages.clear();
threadWorking=true;
alreadyInTaskQueue=false;
return mes;
}
}
synchronized SpyMessage startWorkQueue()
{
synchronized (messages) {
threadWorking=true;
alreadyInTaskQueue=false;
if (messages.size()==0) return null;
return (SpyMessage)messages.removeFirst();
}
}
void endWork()
{
//The thread has finished his work...
threadWorking=false;
synchronized (messages) {
if (isTopic) {
//notify another thread if there is work to do !
if (!messages.isEmpty()) notifyWorkers();
} else {
if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
}
}
}
void sendOneMessage(SpyMessage mes)
{
//we can only add/remove a subscribers once the message is sent (
iterator is fail-fast )
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
try {
dc.cr.receive(destination,mes);
} catch (Exception e) {
Log.error("Cannot deliver this message to the
client "+dc);
Log.error(e);
handleConnectionFailure(dc,i);
}
}
}
}
void sendMultipleMessages(SpyMessage mes[])
{
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
try {
dc.cr.receiveMultiple(destination,mes);
} catch (Exception e) {
Log.error("Cannot deliver those messages to
the client "+dc);
Log.error(e);
handleConnectionFailure(dc,i);
}
}
}
}
//A connection is closing
void connectionClosing(SpyDistributedConnection dc)
{
if (!subscribers.containsKey(dc.getClientID())) return;
Log.notice("Warning: The DistributedConnection was still registered
for "+destination);
removeSubscriber(dc);
}
void notifyWorkers()
{
//It is useless to put many times the same destination in the task
queue
if (alreadyInTaskQueue) return;
synchronized (server.taskQueue) {
alreadyInTaskQueue=true;
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
{
//We should try again :) This behavior should under control of a
Failure-Plugin
Log.error("I remove this Connection from the subscribers list");
//Call JMSServer.ConnectionClosing(), but ask him not to check my list.
server.connectionClosing(dc,this);
//remove this connection from the list
if (i!=null) i.remove();
else subscribers.remove(dc.getClientID());
}
void doMyJob()
{
if (isTopic) {
//Clear the message queue
SpyMessage[] msgs=startWork();
//Let the thread do its work
if (msgs.length>1) {
//We can send multiple messages
Log.log("DISPATCH: "+msgs.length+" messages =>
"+destination);
sendMultipleMessages(msgs);
} else {
//Send each message
for(int i=0;i<msgs.length;i++) {
SpyMessage message=(SpyMessage)msgs[i];
Log.log("DISPATCH: "+message+" =>
"+destination);
if (!message.isOutdated())
sendOneMessage(message);
}
}
//Notify that it has finished its work : another thread can
start working on this queue
endWork();
} else {
while (true) {
//Get a receiver
//NL: We could find a better receiver (load balancing
?)
if (listeners==0) break;
Iterator i=subscribers.values().iterator();
SpyDistributedConnection dc=null;
while (i.hasNext()) {
dc=(SpyDistributedConnection)i.next();
if (dc.listeners!=0) break;
}
if (dc==null||dc.listeners==0) {
listeners=0;
Log.error("WARNING: The listeners count was
invalid !");
break;
}
//Get the message
SpyMessage mes=startWorkQueue();
if (mes==null) break;
if (mes.isOutdated()) continue;
//Send the message
try {
dc.cr.receive(destination,mes);
} catch (Exception e) {
Log.error("Cannot deliver this message to the
client "+dc);
Log.error(e);
handleConnectionFailure(dc,null);
}
}
//Notify that it has finished its work : another thread can
start working on this queue
endWork();
}
}
SpyMessage queueReceiveNoWait()
{
synchronized (messages) {
if (messages.size()==0) return null;
return (SpyMessage)messages.removeFirst();
}
}
void connectionListening(boolean mode,SpyDistributedConnection dc) throws
JMSException
{
SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
if (mode) {
distributedConnection.listeners++;
listeners++;
if (listeners==1&&!threadWorking)
synchronized (messages) {
if (!messages.isEmpty()) notifyWorkers();
}
} else {
distributedConnection.listeners--;
listeners--;
}
Log.log("Listeners for "+destination+" = "+listeners);
}
}
1.1 spyderMQ/src/java/org/spydermq/Log.java
Index: Log.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
/**
* This is a very basic log system,
* the only functionnality that we need is to be able to shut down the log.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class Log
{
final static int LOG_EVERYTHING = 1;
final static int LOG_NOTICE = 2;
final static int LOG_ERRORS = 3;
//Change this line change the verbosity level
final static int logType = LOG_EVERYTHING;
private static void print(Object obj)
{
if (obj instanceof String) System.out.println((String)obj);
else if (obj instanceof Exception) ((Exception)obj).printStackTrace();
else System.out.println(obj.toString());
}
//Logs
public static void log(Object obj)
{
if (logType>LOG_EVERYTHING) return;
print(obj);
}
//Notice
public static void notice(Object obj)
{
if (logType>LOG_NOTICE) return;
print(obj);
}
//Errors
public static void error(Object obj)
{
print(obj);
}
}
1.1 spyderMQ/src/java/org/spydermq/NoReceiverException.java
Index: NoReceiverException.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
/**
* This is a very basic log system,
* the only functionnality that we need is to be able to shut down the log.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
import javax.jms.JMSException;
public class NoReceiverException
extends JMSException
{
public NoReceiverException(String reason, String errorCode)
{
super(reason, errorCode);
}
public NoReceiverException(String reason) {
super(reason);
}
}
1.1 spyderMQ/src/java/org/spydermq/SessionQueue.java
Index: SessionQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.MessageListener;
import javax.jms.MessageConsumer;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Destination;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.Iterator;
import org.spydermq.selectors.Selector;
/**
* This class is a message queue which is stored (hashed by Destination) in the
SpySession object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SessionQueue
{
// Attributes ----------------------------------------------------
//My destination
Destination destination;
//List of messages waiting for acknoledgment
LinkedList messagesWaitingForAck;
//the MessageConsumers linked to this queue
public HashSet subscribers;
//Number of listening receivers
public int NumListeningSubscribers;
//My SpySession
SpySession session;
// Constructor ---------------------------------------------------
SessionQueue(SpySession session,Destination destination)
{
messagesWaitingForAck=new LinkedList();
subscribers=new HashSet();
this.session=session;
this.destination=destination;
NumListeningSubscribers=0;
}
// Package protected ---------------------------------------------
//Send a message from the queue to a MessageConsumer
boolean deliverMessage()
{
boolean result=false;
Iterator i=subscribers.iterator();
while (i.hasNext()) {
SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
synchronized (consumer.messages) {
if (consumer.messages.size()==0) continue;
if (consumer.messageListener==null) {
if (!consumer.waitInReceive) continue;
consumer.messages.notify();
} else {
SpyMessage mes=consumer.getMessage();
if (mes==null) return false;
consumer.messageListener.onMessage(mes);
}
result=true;
}
}
return result;
}
//A message has been acknowledged
void acknowledge(SpyMessage mes)
{
Log.log("SessionQueue: acknowledge("+mes.toString()+")");
synchronized (messagesWaitingForAck) {
int pos=messagesWaitingForAck.indexOf(mes);
if (pos==-1) return;
for(int i=0;i<=pos;i++)
messagesWaitingForAck.removeFirst();
}
}
//notify the sleeping synchronous listeners
void close() throws JMSException
{
Iterator i=subscribers.iterator();
while (i.hasNext()) {
SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
consumer.close();
}
}
//the session is about to recover
void recover() throws JMSException
{
synchronized (messagesWaitingForAck) {
while (messagesWaitingForAck.size()!=0) {
//Get the most recent unacknowledged message
SpyMessage
mes=(SpyMessage)messagesWaitingForAck.removeLast();
//This message is redelivered
mes.setJMSRedelivered(true);
//Put the message in one incoming queue - Is it what
the spec says ?
Iterator i=subscribers.iterator();
if (i.hasNext()) {
SpyMessageConsumer
consumer=(SpyMessageConsumer)i.next();
consumer.addMessage(mes);
}
}
}
}
//the session is about to commit, we have to clear our messagesWaitForAck queue
void commit()
{
synchronized (messagesWaitingForAck) {
messagesWaitingForAck.clear();
}
}
synchronized void addConsumer(SpyMessageConsumer consumer)
{
consumer.setSessionQueue(this);
HashSet newSet=(HashSet)subscribers.clone();
newSet.add(consumer);
subscribers=newSet;
}
synchronized boolean removeConsumer(MessageConsumer consumer)
{
HashSet newSet=(HashSet)subscribers.clone();
newSet.remove(consumer);
subscribers=newSet;
return subscribers.size()==0;
}
synchronized void changeNumListening(int val) throws JMSException
{
NumListeningSubscribers+=val;
if
((val==-1&&NumListeningSubscribers==0)||(val==1&&NumListeningSubscribers==1)) {
ConnectionQueue
connectionQueue=(ConnectionQueue)session.connection.destinations.get(destination);
if (connectionQueue==null) throw new JMSException("There is NO
ConnectionQueue for this Destination in the SpyConnection !");
connectionQueue.changeNumListening(val);
}
}
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
Log.log("SessionQueue:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
if (session.closed) throw new IllegalStateException("The session is
closed");
if (NumListeningSubscribers==0) throw new NoReceiverException("There
are no receivers for this destination !");
if (mes.isOutdated()) return;
Iterator i=subscribers.iterator();
SpyQueueReceiver receiver=null;
while (i.hasNext()) {
receiver=(SpyQueueReceiver)i.next();
if (receiver.listening) break;
}
if (receiver==null||!receiver.listening) {
NumListeningSubscribers=0;
Log.error("WARNING: The listeners count was invalid !");
throw new NoReceiverException("There are no receivers for this
destination !");
}
synchronized (receiver.messages) {
if (receiver.messageListener==null) {
if (!receiver.waitInReceive) throw new
NoReceiverException("The receiver is not waiting for a message !"); //Try someone else
in the same session
receiver.addMessage(mes);
receiver.messages.notify();
} else {
receiver.addMessage(mes);
receiver.messageListener.onMessage(mes);
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyBytesMessage.java
Index: SpyBytesMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
/**
* This class implements javax.jms.BytesMessage
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyBytesMessage
extends SpyMessage
implements Cloneable, BytesMessage
{
// Attributes ----------------------------------------------------
private ByteArrayOutputStream ostream=null;
private DataOutputStream p=null;
private byte[] InternalArray=null;
private ByteArrayInputStream istream=null;
private DataInputStream m=null;
// Constructor ---------------------------------------------------
SpyBytesMessage()
{
msgReadOnly=false;
ostream = new ByteArrayOutputStream();
p = new DataOutputStream(ostream);
}
// Public --------------------------------------------------------
public boolean readBoolean() throws JMSException
{
checkRead();
try {
return m.readBoolean();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public byte readByte() throws JMSException
{
checkRead();
try {
return m.readByte();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public int readUnsignedByte() throws JMSException
{
checkRead();
try {
return m.readUnsignedByte();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public short readShort() throws JMSException
{
checkRead();
try {
return m.readShort();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public int readUnsignedShort() throws JMSException
{
checkRead();
try {
return m.readUnsignedShort();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public char readChar() throws JMSException
{
checkRead();
try {
return m.readChar();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public int readInt() throws JMSException
{
checkRead();
try {
return m.readInt();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public long readLong() throws JMSException
{
checkRead();
try {
return m.readLong();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public float readFloat() throws JMSException
{
checkRead();
try {
return m.readFloat();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public double readDouble() throws JMSException
{
checkRead();
try {
return m.readDouble();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public String readUTF() throws JMSException
{
checkRead();
try {
return m.readUTF();
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public int readBytes(byte[] value) throws JMSException
{
checkRead();
try {
return m.read(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public int readBytes(byte[] value, int length) throws JMSException
{
checkRead();
try {
return m.read(value,0,length);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeBoolean(boolean value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeBoolean(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeByte(byte value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeByte(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeShort(short value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeShort(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeChar(char value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeChar(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeInt(int value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeInt(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeLong(long value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeLong(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeFloat(float value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeFloat(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeDouble(double value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeDouble(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeUTF(String value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.writeUTF(value);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeBytes(byte[] value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.write(value,0,value.length);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeBytes(byte[] value, int offset, int length) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
p.write(value,offset,length);
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void writeObject(Object value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
try {
if (value instanceof String) p.writeChars((String)value);
else if (value instanceof Boolean)
p.writeBoolean(((Boolean)value).booleanValue());
else if (value instanceof Byte)
p.writeByte(((Byte)value).byteValue());
else if (value instanceof Short)
p.writeShort(((Short)value).shortValue());
else if (value instanceof Integer)
p.writeInt(((Integer)value).intValue());
else if (value instanceof Long)
p.writeLong(((Long)value).longValue());
else if (value instanceof Float)
p.writeFloat(((Float)value).floatValue());
else if (value instanceof Double)
p.writeDouble(((Double)value).doubleValue());
else if (value instanceof byte[])
p.write((byte[])value,0,((byte[])value).length);
else throw new MessageFormatException("Invalid object for
properties");
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void reset() throws JMSException
{
try {
if (!msgReadOnly) {
p.flush();
InternalArray=ostream.toByteArray();
ostream.close();
}
ostream=null;
istream=null;
m=null;
p=null;
msgReadOnly = true;
} catch (IOException e) {
throw new JMSException("IOException");
}
}
public void clearBody() throws JMSException
{
try {
if (!msgReadOnly) ostream.close();
else istream.close();
} catch (IOException e) {
//don't throw an exception
}
ostream=new ByteArrayOutputStream();
p=new DataOutputStream(ostream);
InternalArray=null;
istream=null;
m=null;
super.clearBody();
}
// Package protected ---------------------------------------------
//We need to reset() since this message is going to be cloned/serialized
SpyMessage myClone()
{
try {
reset();
return (SpyMessage)clone();
} catch (Exception e) {
throw new RuntimeException("myClone failed !");
}
}
// Private -------------------------------------------------------
private void checkRead() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("readByte
while the buffer is writeonly");
//We have just received/reset() the message, and the client is trying
to read it
if (istream==null||m==null) {
istream = new ByteArrayInputStream(InternalArray);
m = new DataInputStream(istream);
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.ConnectionMetaData;
import javax.jms.ExceptionListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Collection;
import java.util.Properties;
import java.util.Iterator;
import java.io.Serializable;
import java.io.FileInputStream;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.ConnectionReceiverFactory;
/**
* This class implements javax.jms.Connection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyConnection
implements Connection, Serializable
{
// Attributes ----------------------------------------------------
//This is our connection to the JMS server
protected DistributedJMSServer provider;
//This is the clientID
protected String clientID;
//the distributed object which receives messages from the JMS server
protected SpyDistributedConnection distributedConnection;
//HashMap of ConnectionQueue by Destination
public HashMap destinations;
//LinkedList of all created sessions by this connection
HashSet createdSessions;
//Last message ID returned
private int lastMessageID;
//Is the connection stopped ?
public boolean modeStop;
//Is the connection closed ?
boolean closed;
//Name of the connectionReceiver class
String crClassName;
//the exceptionListener
private ExceptionListener exceptionListener;
// Constructor ---------------------------------------------------
SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws
JMSException
{
//Set the attributes
provider = theServer;
destinations=new HashMap();
createdSessions=new HashSet();
distributedConnection=null;
closed=false;
lastMessageID=0;
modeStop=true;
clientID=cID;
crClassName=crCN;
}
// Public --------------------------------------------------------
public String getClientID() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
return clientID;
}
public void setClientID(String cID) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is closed");
if (clientID!=null) throw new IllegalStateException("The connection
has already a clientID");
Log.log("SetClientID("+clientID+")");
try {
provider.checkID(cID);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e,"Cannot connect to the JMSServer");
}
clientID=cID;
}
public ConnectionMetaData getMetaData() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
return new SpyConnectionMetaData();
}
public ExceptionListener getExceptionListener() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
return exceptionListener;
}
public void setExceptionListener(ExceptionListener listener) throws
JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
exceptionListener=listener;
}
public void start() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
if (!modeStop) return;
modeStop=false;
changeModeStop(modeStop);
}
public void stop() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
if (modeStop) return;
modeStop=true;
changeModeStop(modeStop);
}
public synchronized void close() throws JMSException
{
if (closed) return;
closed=true;
//Get an ID / ConnectionReciever
if (distributedConnection==null) createReceiver();
//notify his sessions
synchronized (createdSessions) {
Iterator i=createdSessions.iterator();
while (i.hasNext()) {
((SpySession)i.next()).close();
}
}
//Notify the JMSServer that I am closing
try {
provider.connectionClosing(distributedConnection);
ConnectionReceiverFactory.close(distributedConnection);
} catch (Exception e) {
failureHandler(e,"Cannot close properly the connection");
}
}
//called by a TemporaryDestination which is going to be deleted()
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
Log.log("SpyConnection: deleteDestination(dest="+dest.toString()+")");
try {
//Remove it from the destinations list
synchronized (destinations) {
HashMap newMap=(HashMap)destinations.clone();
newMap.remove(dest);
destinations=newMap;
}
//Notify its sessions that this TemporaryDestination is going
to be deleted()
//We could do that only on the Sessions "linked" to this
Destination
synchronized (createdSessions) {
Iterator i=createdSessions.iterator();
while (i.hasNext()) {
((SpySession)i.next()).deleteTemporaryDestination(dest);
}
}
//Ask the broker to delete() this TemporaryDestination
provider.deleteTemporaryDestination(dest);
} catch (Exception e) {
failureHandler(e,"Cannot delete the TemporaryDestination");
}
}
// Package protected ---------------------------------------------
//Send a message to the provider
//[We should try to locally dispatch the message...]
void sendToServer(SpyMessage mes[]) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
try {
provider.newMessage(mes,clientID);
/*if (mes.jmsDestination instanceof Topic) {
//If this message is sent to a topic, we can try to
deliver it locally
distributedConnection.cr.receive(mes.jmsDestination,mes);
}*/
} catch (Exception e) {
failureHandler(e,"Cannot send a message to the JMS provider");
}
}
//A Session has created a new MessageConsumer for the Destination dest
void addSession(Destination dest, SpySession who) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
Log.log("Connection: addSession(dest="+dest.toString()+")");
try {
synchronized (destinations) {
ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
if (connectionQueue==null) {
connectionQueue=new ConnectionQueue(dest,this);
connectionQueue.addSession(who);
HashMap
newDestinations=(HashMap)destinations.clone();
newDestinations.put(dest,connectionQueue);
destinations=newDestinations;
provider.subscribe(dest,distributedConnection);
} else {
connectionQueue.addSession(who);
}
}
} catch (Exception e) {
failureHandler(e,"Cannot subscribe to this Destination");
}
}
//The session does not need to recieve the messages to Destination dest
void removeSession(Destination dest, SpySession who) throws JMSException
{
if (distributedConnection==null) createReceiver();
Log.log("Connection: removeSession(dest="+dest.toString()+")");
try {
synchronized (destinations) {
ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
if (connectionQueue!=null) {
boolean
empty=connectionQueue.removeSession(who);
if (empty) {
HashMap
newDestinations=(HashMap)destinations.clone();
newDestinations.remove(dest);
destinations=newDestinations;
provider.unsubscribe(dest,distributedConnection);
}
} else {
//this should not happen
HashMap
newDestinations=(HashMap)destinations.clone();
newDestinations.remove(dest);
destinations=newDestinations;
provider.unsubscribe(dest,distributedConnection);
}
}
} catch (Exception e) {
failureHandler(e,"Cannot unsubscribe to this destination");
}
}
//Get a new messageID (creation of a new message)
String getNewMessageID() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
return clientID+"-"+(lastMessageID++);
}
//notify his sessions that he has changed his stopped mode
synchronized void changeModeStop(boolean newValue)
{
synchronized (createdSessions) {
Iterator i=createdSessions.iterator();
while (i.hasNext()) {
((SpySession)i.next()).notifyStopMode(newValue);
}
}
}
//Called by a session when it is closing
void sessionClosing(SpySession who)
{
synchronized (createdSessions)
{
createdSessions.remove(who);
}
//This session should not be in the "destinations" object anymore.
//We could check this, though
}
SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
{
try {
return provider.queueReceiveNoWait(queue);
} catch (Exception e) {
failureHandler(e,"Cannot create a ConnectionReceiver");
return null;
}
}
// Protected -------------------------------------------------------
//create a new Distributed object which receives the messages for this
connection
protected void createReceiver() throws JMSException
{
try {
if (clientID==null) askForAnID();
ConnectionReceiver
receiver=ConnectionReceiverFactory.createConnectionReceiver(this,crClassName);
distributedConnection=new
SpyDistributedConnection(clientID,receiver);
} catch (Exception e) {
failureHandler(e,"Cannot create a ConnectionReceiver");
}
}
//ask the JMS server for a new ID
protected void askForAnID() throws JMSException
{
try {
clientID=provider.getID();
} catch (Exception e) {
failureHandler(e,"Cannot get an ID");
}
}
public void failureHandler(Exception e,String reason) throws JMSException
{
Log.error(e);
JMSException excep=new JMSException(reason);
excep.setLinkedException(e);
if (exceptionListener!=null) {
synchronized (exceptionListener) {
exceptionListener.onException(excep);
}
}
throw excep;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyConnectionMetaData.java
Index: SpyConnectionMetaData.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import java.util.Enumeration;
import java.util.Vector;
/**
* This class implements javax.jms.ConnectionMetaData
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyConnectionMetaData
implements ConnectionMetaData
{
// Public --------------------------------------------------------
public String getJMSVersion() throws JMSException
{
return "1.0";
}
public int getJMSMajorVersion() throws JMSException
{
return 1;
}
public int getJMSMinorVersion() throws JMSException
{
return 0;
}
public String getJMSProviderName() throws JMSException
{
return "JBoss";
}
public String getProviderVersion() throws JMSException
{
return "0.1";
}
public int getProviderMajorVersion() throws JMSException
{
return 0;
}
public int getProviderMinorVersion() throws JMSException
{
return 1;
}
public Enumeration getJMSXPropertyNames() throws JMSException
{
Vector vector=new Vector();
vector.add("JMSXGroupID");
vector.add("JMSXGroupSeq");
return vector.elements();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyDestination.java
Index: SpyDestination.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Destination;
import java.io.Serializable;
import javax.naming.Referenceable;
/**
* This class implements javax.jms.Destination
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyDestination
implements Destination, Serializable
{
// Attributes ----------------------------------------------------
protected String name;
}
1.1 spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
Index: SpyDistributedConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import java.io.Serializable;
/**
* This class is the broker point of view on a SpyConnection (it contains a
ConnectionReceiver)
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyDistributedConnection
implements Serializable
{
private String clientID;
public ConnectionReceiver cr;
public transient int listeners;
SpyDistributedConnection(String id,ConnectionReceiver cr_)
{
clientID=id;
cr=cr_;
}
String getClientID()
{
return clientID;
}
public boolean equals(Object obj)
{
if (obj instanceof ConnectionReceiver) return
cr.equals((ConnectionReceiver)obj);
if (obj instanceof SpyDistributedConnection) return
clientID.equals(((SpyDistributedConnection)obj).clientID);
return false;
}
public int hashCode()
{
if (clientID==null) return 0;
return clientID.hashCode();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java
Index: SpyEncapsulatedMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Message;
/**
* This Message class is used to send a non 'provider-optimized Message' over the
network [4.4.5]
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyEncapsulatedMessage
extends SpyMessage
{
private Message mes;
SpyEncapsulatedMessage(Message m)
{
//mes=m.clone();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyMapMessage.java
Index: SpyMapMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.MapMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import java.util.Enumeration;
import java.util.Hashtable;
/**
* This class implements javax.jms.MapMessage
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyMapMessage
extends SpyMessage
implements MapMessage, Cloneable
{
// Attributes ----------------------------------------------------
private Hashtable content;
// Constructor ---------------------------------------------------
SpyMapMessage()
{
content=new Hashtable();
}
// Public --------------------------------------------------------
public boolean getBoolean(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Boolean.getBoolean(null);
if (value instanceof Boolean) return ((Boolean)value).booleanValue();
else if (value instanceof String) return
Boolean.getBoolean((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public byte getByte(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Byte.parseByte(null);
if (value instanceof Byte) return ((Byte)value).byteValue();
else if (value instanceof String) return Byte.parseByte((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public short getShort(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Short.parseShort(null);
if (value instanceof Byte) return ((Byte)value).shortValue();
else if (value instanceof Short) return ((Short)value).shortValue();
else if (value instanceof String) return
Short.parseShort((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public char getChar(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) throw new NullPointerException("Invalid conversion");
if (value instanceof Character) return ((Character)value).charValue();
else throw new MessageFormatException("Invalid conversion");
}
public int getInt(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Integer.parseInt(null);
if (value instanceof Byte) return ((Byte)value).intValue();
else if (value instanceof Short) return ((Short)value).intValue();
else if (value instanceof Integer) return ((Integer)value).intValue();
else if (value instanceof String) return
Integer.parseInt((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public long getLong(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Long.parseLong(null);
if (value instanceof Byte) return ((Byte)value).longValue();
else if (value instanceof Short) return ((Short)value).longValue();
else if (value instanceof Integer) return ((Integer)value).longValue();
else if (value instanceof Long) return ((Long)value).longValue();
else if (value instanceof String) return Long.parseLong((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public float getFloat(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Float.parseFloat(null);
if (value instanceof Float) return ((Float)value).floatValue();
else if (value instanceof String) return
Float.parseFloat((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public double getDouble(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return Double.parseDouble(null);
if (value instanceof Float) return ((Float)value).doubleValue();
else if (value instanceof Double) return ((Double)value).doubleValue();
else if (value instanceof String) return
Double.parseDouble((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public String getString(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return null;
if (value instanceof Boolean) return ((Boolean)value).toString();
else if (value instanceof Byte) return ((Byte)value).toString();
else if (value instanceof Short) return ((Short)value).toString();
else if (value instanceof Character) return
((Character)value).toString();
else if (value instanceof Integer) return ((Integer)value).toString();
else if (value instanceof Long) return ((Long)value).toString();
else if (value instanceof Float) return ((Float)value).toString();
else if (value instanceof Double) return ((Double)value).toString();
else if (value instanceof String) return (String)value;
else throw new MessageFormatException("Invalid conversion");
}
public byte[] getBytes(String name) throws JMSException
{
Object value=content.get(name);
if (value==null) return null;
if (value instanceof byte[]) return (byte[])value;
else throw new MessageFormatException("Invalid conversion");
}
public Object getObject(String name) throws JMSException
{
return content.get(name);
}
public Enumeration getMapNames() throws JMSException
{
return content.keys();
}
public void setBoolean(String name, boolean value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Boolean(value));
}
public void setByte(String name, byte value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Byte(value));
}
public void setShort(String name, short value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Short(value));
}
public void setChar(String name, char value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Character(value));
}
public void setInt(String name, int value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Integer(value));
}
public void setLong(String name, long value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Long(value));
}
public void setFloat(String name, float value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Float(value));
}
public void setDouble(String name, double value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,new Double(value));
}
public void setString(String name, String value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,value);
}
public void setBytes(String name, byte[] value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
content.put(name,value.clone());
}
public void setBytes(String name, byte[] value, int offset, int length) throws
JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
if (offset+length>value.length) throw new JMSException("Array is too
small");
byte[] temp = new byte[length];
for(int i=0;i<length;i++)
temp[i]=value[i+offset];
content.put(name,temp);
}
public void setObject(String name, Object value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Message is
ReadOnly !");
if (value instanceof Boolean) content.put(name,value);
else if (value instanceof Byte) content.put(name,value);
else if (value instanceof Short) content.put(name,value);
else if (value instanceof Character) content.put(name,value);
else if (value instanceof Integer) content.put(name,value);
else if (value instanceof Long) content.put(name,value);
else if (value instanceof Float) content.put(name,value);
else if (value instanceof Double) content.put(name,value);
else if (value instanceof String) content.put(name,value);
else if (value instanceof byte[])
content.put(name,((byte[])value).clone());
else throw new MessageFormatException("Invalid object type");
}
public boolean itemExists(String name) throws JMSException
{
return content.containsKey(name);
}
public void clearBody() throws JMSException
{
content=new Hashtable();
super.clearBody();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyMessage.java
Index: SpyMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import javax.jms.Destination;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Date;
import java.io.Serializable;
/**
* This class implements javax.jms.Message
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyMessage
implements Serializable, Cloneable, Message
{
// Constants -----------------------------------------------------
static final int DEFAULT_DELIVERY_MODE = -1;
static final int DEFAULT_PRIORITY = -1;
static final int DEFAULT_TIME_TO_LIVE = -1;
// Attributes ----------------------------------------------------
//Those attributes are not transient ---------------
//Header fields
//Set by send() method
Destination jmsDestination=null;
private int jmsDeliveryMode=-1;
private long jmsExpiration=0;
private int jmsPriority=-1;
private String jmsMessageID=null;
private long jmsTimeStamp=0;
//Set by the client
private boolean jmsCorrelationID=true;
private String jmsCorrelationIDString=null;
private byte[] jmsCorrelationIDbyte=null;
private Destination jmsReplyTo=null;
private String jmsType=null;
//Set by the provider
private boolean jmsRedelivered=false;
//Properties
private Hashtable prop;
private boolean propReadWrite;
//Message body
protected boolean msgReadOnly=false;
//Those attributes are transient ---------------
//For acknowledgment
private transient SessionQueue mySessionQueue;
//For the storage in the JMSServerQueue object
public transient SpyDistributedConnection originalDistributedConnection;
// Constructor ---------------------------------------------------
SpyMessage()
{
prop=new Hashtable();
propReadWrite=true;
mySessionQueue=null;
}
// Public --------------------------------------------------------
public String getJMSMessageID() throws JMSException
{
return jmsMessageID;
}
public void setJMSMessageID(String id) throws JMSException
{
jmsMessageID=id;
}
public long getJMSTimestamp() throws JMSException
{
return jmsTimeStamp;
}
public void setJMSTimestamp(long timestamp) throws JMSException
{
jmsTimeStamp=timestamp;
}
public byte [] getJMSCorrelationIDAsBytes() throws JMSException
{
if (jmsCorrelationID) throw new JMSException("JMSCorrelationID is a
string");
return jmsCorrelationIDbyte;
}
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
{
jmsCorrelationID=false;
jmsCorrelationIDbyte=(byte[])correlationID.clone();
jmsCorrelationIDString=null;
}
public void setJMSCorrelationID(String correlationID) throws JMSException
{
jmsCorrelationID=true;
jmsCorrelationIDString=correlationID;
jmsCorrelationIDbyte=null;
}
public String getJMSCorrelationID() throws JMSException
{
if (!jmsCorrelationID) throw new JMSException("JMSCorrelationID is an
array");
return jmsCorrelationIDString;
}
public Destination getJMSReplyTo() throws JMSException
{
return jmsReplyTo;
}
public void setJMSReplyTo(Destination replyTo) throws JMSException
{
jmsReplyTo=replyTo;
}
public Destination getJMSDestination() throws JMSException
{
return jmsDestination;
}
public void setJMSDestination(Destination destination) throws JMSException
{
jmsDestination=destination;
}
public int getJMSDeliveryMode() throws JMSException
{
return jmsDeliveryMode;
}
public void setJMSDeliveryMode(int deliveryMode) throws JMSException
{
jmsDeliveryMode=deliveryMode;
}
public boolean getJMSRedelivered() throws JMSException
{
return jmsRedelivered;
}
public void setJMSRedelivered(boolean redelivered) throws JMSException
{
jmsRedelivered=redelivered;
}
public String getJMSType() throws JMSException
{
return jmsType;
}
public void setJMSType(String type) throws JMSException
{
jmsType=type;
}
public long getJMSExpiration() throws JMSException
{
return jmsExpiration;
}
public void setJMSExpiration(long expiration) throws JMSException
{
jmsExpiration=expiration;
}
public int getJMSPriority() throws JMSException
{
return jmsPriority;
}
public void setJMSPriority(int priority) throws JMSException
{
jmsPriority=priority;
}
public void clearProperties() throws JMSException
{
prop=new Hashtable();
propReadWrite=true;
}
public boolean propertyExists(String name) throws JMSException
{
return prop.containsKey(name);
}
public boolean getBooleanProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Boolean) return ((Boolean)value).booleanValue();
else if (value instanceof String) return
Boolean.getBoolean((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public byte getByteProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Byte) return ((Byte)value).byteValue();
else if (value instanceof String) return Byte.parseByte((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public short getShortProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Byte) return ((Byte)value).shortValue();
else if (value instanceof Short) return ((Short)value).shortValue();
else if (value instanceof String) return
Short.parseShort((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public int getIntProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Byte) return ((Byte)value).intValue();
else if (value instanceof Short) return ((Short)value).intValue();
else if (value instanceof Integer) return ((Integer)value).intValue();
else if (value instanceof String) return
Integer.parseInt((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public long getLongProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Byte) return ((Byte)value).longValue();
else if (value instanceof Short) return ((Short)value).longValue();
else if (value instanceof Integer) return ((Integer)value).longValue();
else if (value instanceof Long) return ((Long)value).longValue();
else if (value instanceof String) return Long.parseLong((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public float getFloatProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Float) return ((Float)value).floatValue();
else if (value instanceof String) return
Float.parseFloat((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public double getDoubleProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
if (value instanceof Float) return ((Float)value).doubleValue();
else if (value instanceof Double) return ((Double)value).doubleValue();
else if (value instanceof String) return
Double.parseDouble((String)value);
else throw new MessageFormatException("Invalid conversion");
}
public String getStringProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) return null;
if (value instanceof Boolean) return ((Boolean)value).toString();
else if (value instanceof Byte) return ((Byte)value).toString();
else if (value instanceof Short) return ((Short)value).toString();
else if (value instanceof Integer) return ((Integer)value).toString();
else if (value instanceof Long) return ((Long)value).toString();
else if (value instanceof Float) return ((Float)value).toString();
else if (value instanceof Double) return ((Double)value).toString();
else if (value instanceof String) return (String)value;
else throw new MessageFormatException("Invalid conversion");
}
public Object getObjectProperty(String name) throws JMSException
{
Object value=prop.get(name);
return value;
}
public Enumeration getPropertyNames() throws JMSException
{
return prop.keys();
}
void CheckPropertyName(String name) throws JMSException
{
if (name.regionMatches(false,0,"JMS_",0,4)) {
throw new JMSException("Bad property name");
}
if (name.regionMatches(false,0,"JMSX",0,4)) {
if (name.equals("JMSXGroupId")) return;
if (name.equals("JMSXGroupSeq")) return;
throw new JMSException("Bad property name");
}
}
public void setBooleanProperty(String name, boolean value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Boolean(value));
}
public void setByteProperty(String name, byte value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Byte(value));
}
public void setShortProperty(String name, short value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Short(value));
}
public void setIntProperty(String name, int value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Integer(value));
}
public void setLongProperty(String name, long value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Long(value));
}
public void setFloatProperty(String name, float value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Float(value));
}
public void setDoubleProperty(String name, double value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Double(value));
}
public void setStringProperty(String name, String value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new String(value));
}
public void setObjectProperty(String name, Object value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
if (value instanceof Boolean) prop.put(name,value);
else if (value instanceof Byte) prop.put(name,value);
else if (value instanceof Short) prop.put(name,value);
else if (value instanceof Integer) prop.put(name,value);
else if (value instanceof Long) prop.put(name,value);
else if (value instanceof Float) prop.put(name,value);
else if (value instanceof Double) prop.put(name,value);
else if (value instanceof String) prop.put(name,value);
else throw new MessageFormatException("Invalid object type");
}
public void clearBody() throws JMSException
{
//Inherited classes clear their content here
msgReadOnly=false;
}
public void acknowledge() throws JMSException
{
//There is no need to acknowledge() this message
if (mySessionQueue==null) return;
mySessionQueue.acknowledge(this);
}
// Package protected ---------------------------------------------
void setSessionQueue(SessionQueue sessionQueue)
{
mySessionQueue=sessionQueue;
}
void setReadOnlyMode()
{
propReadWrite=false;
msgReadOnly=true;
}
SpyMessage myClone()
{
try {
return (SpyMessage)clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("myClone failed !");
}
}
boolean isOutdated()
{
if (jmsExpiration==0) return false;
long ts=(new Date()).getTime();
return jmsExpiration<ts;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.MessageConsumer;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.Session;
import java.util.LinkedList;
import java.util.Date;
import org.spydermq.selectors.Selector;
/**
* This class implements javax.jms.MessageConsumer
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyMessageConsumer
implements MessageConsumer
{
// Attributes ----------------------------------------------------
//Link to my session
protected SpySession session;
//My message listener (null if none)
MessageListener messageListener;
//Am I closed ?
protected boolean closed;
//Do I have a selector
public Selector selector;
//The message selector
public String messageSelector;
//A link to my session queue (in my session)
protected SessionQueue sessionQueue;
//List of Pending messages (not yet delivered)
LinkedList messages;
//Is the consumer sleeping in a receive() ?
boolean waitInReceive;
// Constructor ---------------------------------------------------
SpyMessageConsumer(SpySession s)
{
session=s;
messageListener=null;
closed=false;
selector=null;
messageSelector=null;
messages=new LinkedList();
waitInReceive=false;
}
void setSessionQueue(SessionQueue sessionQueue)
{
this.sessionQueue=sessionQueue;
}
// Public --------------------------------------------------------
public String getMessageSelector() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
throw new RuntimeException("pure virtual call");
}
public Message receive() throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
throw new RuntimeException("pure virtual call");
}
public Message receive(long timeOut) throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
throw new RuntimeException("pure virtual call");
}
public Message receiveNoWait() throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
throw new RuntimeException("pure virtual call");
}
public synchronized void close() throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
throw new RuntimeException("pure virtual call");
}
//Package protected - Not part of the spec
void setSelector(Selector selector,String messageSelector)
{
this.selector=selector;
this.messageSelector=messageSelector;
}
SpyMessage getMessage()
{
synchronized (messages) {
while (true) {
try {
if (messages.size()==0) return null;
SpyMessage
mes=(SpyMessage)messages.removeFirst();
if (mes.isOutdated()) {
Log.notice("SessionQueue: I dropped a
message (timeout)");
continue;
}
if (selector!=null) {
if (!selector.test(mes)) {
Log.log("SessionQueue: I
dropped a message (selector)");
continue;
} else {
Log.log("SessionQueue:
selector evaluates TRUE");
}
}
//the SAME Message object is put in different
SessionQueues
//when we deliver it, we have to clone() it to
insure independance
SpyMessage message=mes.myClone();
if (!session.transacted) {
if
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
synchronized
(sessionQueue.messagesWaitingForAck) {
//Put the message in
the messagesWaitForAck queue
sessionQueue.messagesWaitingForAck.addLast(message);
}
message.setSessionQueue(sessionQueue);
} else if
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
//DUPS_OK_ACKNOWLEDGE
} else {
//AUTO_ACKNOWLEDGE
//we don't need to keep this
message in a queue
}
} else {
//We are linked to a transacted
session
synchronized
(sessionQueue.messagesWaitingForAck) {
//Put the message in the
messagesWaitForAck queue
sessionQueue.messagesWaitingForAck.addLast(message);
}
}
return message;
} catch (Exception e) {
Log.error(e);
}
}
}
}
void addMessage(SpyMessage mes) throws JMSException
{
synchronized (messages) {
//Add a message to the queue
//Test the priority
int pri=mes.getJMSPriority();
if (pri<=4) {
//normal priority message
messages.addLast(mes);
} else {
//expedited priority message
int size=messages.size();
int i=0;
for(;i<size;i++) {
if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
}
messages.add(i,mes);
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyMessageProducer.java
Index: SpyMessageProducer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.MessageProducer;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import javax.jms.Message;
/**
* This class implements javax.jms.MessageProducer
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyMessageProducer
implements MessageProducer
{
// Attributes ----------------------------------------------------
private boolean disableMessageID = false;
private boolean disableTS = false;
protected int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
protected int defaultPriority=4;
protected int defaultTTL=0;
// Public --------------------------------------------------------
public void setDisableMessageID(boolean value) throws JMSException
{
disableMessageID=value;
}
public boolean getDisableMessageID() throws JMSException
{
return disableMessageID;
}
public void setDisableMessageTimestamp(boolean value) throws JMSException
{
disableTS=value;
}
public boolean getDisableMessageTimestamp() throws JMSException
{
return disableTS;
}
public void setDeliveryMode(int deli) throws JMSException
{
if (deli==Message.DEFAULT_DELIVERY_MODE)
defaultDeliveryMode=DeliveryMode.NON_PERSISTENT;
else if
(deli!=DeliveryMode.NON_PERSISTENT&&deli!=DeliveryMode.PERSISTENT) throw new
JMSException("Bad DeliveryMode value");
else defaultDeliveryMode=deli;
}
public int getDeliveryMode() throws JMSException
{
return defaultDeliveryMode;
}
public void setPriority(int pri) throws JMSException
{
if (pri==Message.DEFAULT_PRIORITY) defaultPriority=4;
else if (pri<0||pri>9) throw new JMSException("Bad priority value");
else defaultPriority=pri;
}
public int getPriority() throws JMSException
{
return defaultPriority;
}
public void setTimeToLive(int timeToLive) throws JMSException
{
if (timeToLive==Message.DEFAULT_TIME_TO_LIVE) timeToLive=0;
else if (timeToLive<0) throw new JMSException("Bad TimeToLive value");
else defaultTTL=timeToLive;
}
public int getTimeToLive() throws JMSException
{
return defaultTTL;
}
public void close() throws JMSException
{
//Is there anything useful to do ?
//Let the GC do its work !
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyObjectMessage.java
Index: SpyObjectMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.ObjectMessage;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
import java.io.Serializable;
import java.io.IOException;
import java.io.OptionalDataException;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
/**
* This class implements javax.jms.ObjectMessage
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyObjectMessage
extends SpyMessage
implements Cloneable, ObjectMessage
{
// Attributes ----------------------------------------------------
private byte[] content=null;
// Public --------------------------------------------------------
public void setObject(Serializable object) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("setObject");
try {
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
ObjectOutputStream p = new ObjectOutputStream(ostream);
p.writeObject(object);
p.flush();
content=ostream.toByteArray();
ostream.close();
} catch (IOException e) {
throw new MessageFormatException("Object cannot be
serialized");
}
}
public Serializable getObject() throws JMSException
{
try {
ByteArrayInputStream istream = new
ByteArrayInputStream(content);
ObjectInputStream p = new ObjectInputStream(istream);
Serializable object=(Serializable)p.readObject();
istream.close();
return object;
} catch (OptionalDataException e) {
throw new MessageFormatException("OptionalDataException");
} catch (ClassNotFoundException e) {
throw new MessageFormatException("ClassNotFoundException");
} catch (IOException e) {
throw new MessageFormatException("IOException");
}
}
public void clearBody() throws JMSException
{
content=null;
super.clearBody();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueue.java
Index: SpyQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Queue;
import javax.jms.JMSException;
import java.io.Serializable;
/**
* This class implements javax.jms.Queue
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueue
extends SpyDestination
implements Queue, Serializable
{
// Constructor ---------------------------------------------------
SpyQueue(String queueName)
{
super();
name=queueName;
}
// Public --------------------------------------------------------
public String getQueueName() throws JMSException
{
return name;
}
public String toString()
{
return "Queue@"+name;
}
// Object override -----------------------------------------------
//A topic is identified by its name
public boolean equals(Object obj)
{
if (obj instanceof SpyQueue)
return ((SpyDestination)obj).name.equals(name);
return false;
}
public int hashCode()
{
return name.hashCode()+1;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java
Index: SpyQueueBrowser.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueBrowser;
import javax.jms.Queue;
import javax.jms.JMSException;
import java.util.Enumeration;
/**
* This class implements javax.jms.QueueBrowser
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueueBrowser
implements QueueBrowser
{
//Public
public Queue getQueue() throws JMSException
{
//Nor implemented yet
return null;
}
public String getMessageSelector() throws JMSException
{
//Nor implemented yet
return null;
}
public Enumeration getEnumeration() throws JMSException
{
//Nor implemented yet
return null;
}
public void close() throws JMSException
{
//Nor implemented yet
return;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
Index: SpyQueueConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueConnection;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.ConnectionConsumer;
import javax.jms.ServerSessionPool;
import javax.jms.TemporaryQueue;
import javax.jms.Queue;
import java.io.Serializable;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
/**
* This class implements javax.jms.QueueConnection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueueConnection
extends SpyConnection
implements Serializable, QueueConnection
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Constructor ---------------------------------------------------
public SpyQueueConnection(DistributedJMSServer theServer,String cID,String
crCN) throws JMSException
{
super(theServer,cID,crCN);
}
// Public --------------------------------------------------------
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
QueueSession session=new
SpyQueueSession(this,transacted,acknowledgeMode,modeStop);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
}
return session;
}
public ConnectionConsumer createConnectionConsumer(Queue queue,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws
JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
//Not impelemted yet
return null;
}
// Package protected ---------------------------------------------
void sendToServer(SpyMessage[] c) throws JMSException
{
Log.log("Connection: sendToServer("+c.length+" msgs)");
super.sendToServer(c);
}
TemporaryQueue getTemporaryQueue() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
try {
return provider.getTemporaryQueue(distributedConnection);
} catch (Exception e) {
failureHandler(e,"Cannot create a temporary queue !");
return null;
}
}
//Get a queue
Queue createQueue(String name) throws JMSException
{
try {
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
return provider.createQueue(name);
} catch (Exception e) {
failureHandler(e,"Cannot get the Queue from the provider");
return null;
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueReceiver;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Date;
/**
* This class implements javax.jms.QueueReceiver
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
implements QueueReceiver
{
// Attributes ----------------------------------------------------
//The queue I registered
private Queue queue;
//Mode of this QueueReceiver
boolean listening;
// Constructor ---------------------------------------------------
SpyQueueReceiver(SpyQueueSession session,Queue queue)
{
super(session);
this.queue=queue;
listening=false;
}
// Public --------------------------------------------------------
public Queue getQueue() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return queue;
}
public void close() throws JMSException
{
if (closed) return;
closed=true;
setListening(false);
}
//Overrides MessageConsumer
public Message receive() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
setListening(true);
synchronized (messages) {
//if the client follows the specification [4.4.6], he cannot
use this session
//to asynchronously receive a message or receive() in another
thread.
//If a message is already pending for this session, we can
immediatly deliver it
while (true) {
if (closed) {
setListening(false);
return null;
}
if (!session.modeStop) {
Message mes=getMessage();
if (mes!=null) {
setListening(false);
return mes;
}
} else Log.log("the connection is stopped !");
try {
waitInReceive=true;
messages.wait();
} catch (InterruptedException e) {
} finally {
waitInReceive=false;
}
}
}
}
public Message receive(long timeOut) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (timeOut==0) return receive();
long endTime=(new Date()).getTime()+timeOut;
setListening(true);
synchronized (messages) {
//if the client respects the specification [4.4.6], he cannot
use this session
//to asynchronously receive a message or receive() from
another thread.
//If a message is already pending for this session, we can
deliver it
while (true) {
if (closed) {
setListening(false);
return null;
}
if (!session.modeStop) {
Message mes=getMessage();
if (mes!=null) {
setListening(false);
return mes;
}
} else Log.log("the connection is stopped !");
long att=endTime-((new Date()).getTime());
if (att<=0) {
setListening(false);
return null;
}
try {
waitInReceive=true;
messages.wait(att);
} catch (InterruptedException e) {
} finally {
waitInReceive=false;
}
}
}
}
public Message receiveNoWait() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (session.modeStop) return null;
return session.connection.queueReceiveNoWait(queue);
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
messageListener=listener;
setListening(listener!=null);
}
//---
void setListening(boolean newvalue) throws JMSException
{
if (newvalue==listening) return;
listening=newvalue;
if (listening) sessionQueue.changeNumListening(1);
else sessionQueue.changeNumListening(-1);
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueueSender.java
Index: SpyQueueSender.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueSender;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.InvalidDestinationException;
import java.util.Date;
/**
* This class implements javax.jms.QueueSender
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueueSender
extends SpyMessageProducer
implements QueueSender
{
// Attributes ----------------------------------------------------
//The session to which this sender is linked
private SpyQueueSession session;
//The queue of this sender
private Queue queue=null;
// Constructor ---------------------------------------------------
SpyQueueSender(SpyQueueSession session,Queue queue)
{
this.session=session;
this.queue=queue;
}
// Public --------------------------------------------------------
public Queue getQueue() throws JMSException
{
return queue;
}
//Send methods
public void send(Message message) throws JMSException
{
if (queue==null) throw new InvalidDestinationException("I do not have
a default Destination !");
send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
}
public void send(Queue queue, Message message) throws JMSException
{
send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
}
public void send(Message message, int deliveryMode, int priority, long
timeToLive) throws JMSException
{
if (queue==null) throw new InvalidDestinationException("I do not have
a default Destination !");
send(queue,message,deliveryMode,priority,timeToLive);
}
public void send(Queue queue, Message mes, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
//We only accept our classes (for now)
if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot
deliver this message");
SpyMessage message=(SpyMessage)mes;
//Set the header fields
message.jmsDestination=queue;
message.setJMSDeliveryMode(deliveryMode);
Date ts=new Date();
message.setJMSTimestamp(ts.getTime());
if (timeToLive==0) {
message.setJMSExpiration(0);
} else {
message.setJMSExpiration(timeToLive+ts.getTime());
}
message.setJMSPriority(priority);
message.setJMSMessageID(session.getNewMessageID());
//Set the properties and the message body in ReadOnly mode
//the client has to call clearProperties() and clearBody() if he wants
to modify those values
message.setReadOnlyMode();
//This message is not redelivered
message.setJMSRedelivered(false);
//We must put a 'new message' in the Session's outgoing queue [3.9]
session.sendMessage(message.myClone());
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueSession;
import javax.jms.Queue;
import javax.jms.Destination;
import javax.jms.QueueReceiver;
import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TemporaryQueue;
import javax.jms.QueueBrowser;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
/**
* This class implements javax.jms.QueueSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyQueueSession
extends SpySession
implements QueueSession
{
// Constructor ---------------------------------------------------
SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
super(myConnection,transacted,acknowledgeMode,stop);
}
// Public --------------------------------------------------------
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Not yet implemented
return null;
}
public QueueBrowser createBrowser(Queue queue,String messageSelector) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Not yet implemented
return createBrowser(queue);
}
public Queue createQueue(String queueName) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyQueueConnection)connection).createQueue(queueName);
}
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
SessionQueue sessionQueue=addConsumer(queue,receiver);
return receiver;
}
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Not yet implemented
return createReceiver(queue);
}
public QueueSender createSender(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyQueueSender(this,queue);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyQueueConnection)connection).getTemporaryQueue();
}
//Not part of the spec
//Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
{
//Done in the SessionQueue :)
}
// Package protected ---------------------------------------------
//called by a MessageProducer object which needs to send a message
void sendMessage(SpyMessage m) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Synchronize with the outgoingQueue
synchronized (outgoingQueue)
{
//Test the priority
int pri=m.getJMSPriority();
if (pri<=4) {
//normal priority message
outgoingQueue.addLast(m);
} else {
//expedited priority message
int size=outgoingQueue.size();
int i=0;
for(;i<size;i++) {
if
(((SpyMessage)outgoingQueue.get(i)).getJMSPriority()<pri) break;
}
outgoingQueue.add(i,m);
}
}
//Notify the [sleeping ?] thread that there is work to do
//We should not wait for the lock...
synchronized (thread)
{
thread.notify();
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.MessageListener;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Collection;
/**
* This class implements javax.jms.Session
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpySession
implements Runnable, Session
{
// Attributes ----------------------------------------------------
//Is this session transacted ?
protected boolean transacted;
//What is the type of acknowledgement ?
protected int acknowledgeMode;
//The messageListener for this session
private MessageListener messageListener;
//The connection object to which this session is linked
protected SpyConnection connection;
//HashMap of SessionQueue by Destination
public HashMap destinations;
//The outgoing message queue
protected LinkedList outgoingQueue;
//The outgoing message queue for messages that have been commited (if the
session is transacted)
protected LinkedList outgoingCommitedQueue;
//Is my connection in stopped mode ?
protected boolean modeStop;
//Is the session closed ?
boolean closed;
//This object is the object used to synchronize the session's thread - Need
fixed / improvement
public Integer thread;
//Is this session in alpha mode ?
public boolean alphaMode;
// Constructor ---------------------------------------------------
SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
{
connection=conn;
transacted=trans;
acknowledgeMode=acknowledge;
destinations=new HashMap();
outgoingQueue=new LinkedList();
outgoingCommitedQueue=new LinkedList();
modeStop=stop;
messageListener=null;
closed=false;
thread=new Integer(0);
alphaMode=true;
//Start my thread
Thread oneThread=new Thread(this);
oneThread.setDaemon(true);
oneThread.start();
}
// Public --------------------------------------------------------
public BytesMessage createBytesMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyBytesMessage();
}
public MapMessage createMapMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyMapMessage();
}
public Message createMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyMessage();
}
public ObjectMessage createObjectMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyObjectMessage();
}
public ObjectMessage createObjectMessage(Serializable object) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
ObjectMessage msg=new SpyObjectMessage();
msg.setObject(object);
return msg;
}
public StreamMessage createStreamMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyStreamMessage();
}
public TextMessage createTextMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyTextMessage();
}
public TextMessage createTextMessage(StringBuffer stringBuffer) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
TextMessage msg=new SpyTextMessage();
msg.setText(stringBuffer.toString());
return msg;
}
public boolean getTransacted() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return transacted;
}
public MessageListener getMessageListener() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
messageListener=listener;
}
//The thread for this session. It sends outgoing messages and delivers
incoming ones
public void run()
{
Log.log("Hi ! I'm a session thread :)");
while (true) {
synchronized (thread) {
boolean doneJob=false;
if (closed) return;
//look at outgoing queues
SpyMessage outgoingJob[]=null;
if (transacted) {
synchronized (outgoingCommitedQueue) {
//The session is transacted, we take
the outgoing msgs from outgoingCommitedQueue
if (outgoingCommitedQueue.size()!=0) {
SpyMessage array[]=new
SpyMessage[outgoingCommitedQueue.size()];
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
outgoingCommitedQueue.clear();
}
}
} else {
synchronized (outgoingQueue) {
//The session is not transacted, we
take the outgoing msgs from outgoingQueue
if (outgoingQueue.size()!=0) {
SpyMessage array[]=new
SpyMessage[outgoingQueue.size()];
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
outgoingQueue.clear();
}
}
}
if (outgoingJob!=null) {
try {
//Check for outdated messages !
connection.sendToServer(outgoingJob);
doneJob=true;
} catch (JMSException e) {
Log.log("Cannot send
"+outgoingJob.toString()+" to the provider...");
Log.error(e);
}
}
//if we are not in stopped mode, look at the incoming
queue
if (!modeStop) {
Collection values = destinations.values();
Iterator i=values.iterator();
while (i.hasNext()) {
SessionQueue
sessionQueue=(SessionQueue)i.next();
doneJob=doneJob||sessionQueue.deliverMessage();
}
}
//If there was smthg to do, try again
if (doneJob) continue;
try {
Log.log("SessionThread: I'm going to bed...");
thread.wait();
Log.log("SessionThread: I wake up");
} catch (InterruptedException e) {
}
}
}
}
public synchronized void close() throws JMSException
{
if (closed) return;
closed=true;
//if the thread is sleeping, kill it
synchronized (thread) {
thread.notify();
}
//notify the sleeping synchronous listeners
Collection values = destinations.values();
Iterator i=values.iterator();
while (i.hasNext()) {
SessionQueue sessionQueue=(SessionQueue)i.next();
sessionQueue.close();
}
connection.sessionClosing(this);
}
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
//The job is done in inherited classes
throw new RuntimeException("pure virtual call");
}
//Commit a transacted session
public synchronized void commit() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
if (!transacted) throw new IllegalStateException("The session is not
transacted");
Log.log("Session: commit()");
boolean modeSav=modeStop;
modeStop=true;
//Wait for the thread to sleep
synchronized (thread) {
//Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
outgoingCommitedQueue.addAll(outgoingQueue);
outgoingQueue.clear();
//Notify each SessionQueue that we are going to commit
Collection values = destinations.values();
Iterator i=values.iterator();
while (i.hasNext()) {
SessionQueue sessionQueue=(SessionQueue)i.next();
sessionQueue.commit();
}
//We have finished our work, we can wake up the thread
modeStop=modeSav;
thread.notify();
}
}
//Rollback a transacted session
public synchronized void rollback() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
if (!transacted) throw new IllegalStateException("The session is not
transacted");
Log.log("Session: rollback()");
boolean modeSav=modeStop;
modeStop=true;
//Wait for the thread to sleep
synchronized (thread) {
//Clear the outgoing queue
outgoingQueue.clear();
//Notify each SessionQueue that we are going to rollback
Collection values = destinations.values();
Iterator i=values.iterator();
while (i.hasNext()) {
SessionQueue sessionQueue=(SessionQueue)i.next();
sessionQueue.recover();
}
//We have finished our work, we can wake up the thread
modeStop=modeSav;
thread.notify();
}
}
public synchronized void recover() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
if (transacted) throw new IllegalStateException("The session is
transacted");
Log.log("Session: recover()");
boolean modeSav=modeStop;
modeStop=true;
//Wait for the thread to sleep
synchronized (thread) {
//Notify each SessionQueue that we are going to recover
Collection values = destinations.values();
Iterator i=values.iterator();
while (i.hasNext()) {
SessionQueue sessionQueue=(SessionQueue)i.next();
sessionQueue.recover();
}
//We have finished our work, we can wake up the thread
modeStop=modeSav;
thread.notify();
}
}
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
{
Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
//Remove it from the subscribers list
synchronized (destinations) {
HashMap newMap=(HashMap)destinations.clone();
newMap.remove(dest);
destinations=newMap;
}
//We could look at our incoming and outgoing queues to drop messages
}
// Package protected ---------------------------------------------
SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
synchronized (destinations) {
SessionQueue sub=(SessionQueue)destinations.get(dest);
if (sub==null) {
sub=new SessionQueue(this,dest);
sub.addConsumer(who);
HashMap newDestinations=(HashMap)destinations.clone();
newDestinations.put(dest,sub);
destinations=newDestinations;
connection.addSession(dest,this);
} else {
sub.addConsumer(who);
}
return sub;
}
}
void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
{
Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
synchronized (destinations) {
SessionQueue sub=(SessionQueue)destinations.get(dest);
if (sub!=null) {
boolean empty=sub.removeConsumer(who);
if (empty) {
HashMap
newDestinations=(HashMap)destinations.clone();
newDestinations.remove(dest);
destinations=newDestinations;
connection.removeSession(dest,this);
}
} else {
//this should not happen
HashMap newDestinations=(HashMap)destinations.clone();
newDestinations.remove(dest);
destinations=newDestinations;
}
}
}
String getNewMessageID() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return connection.getNewMessageID();
}
//The connection has changed its mode (stop() or start())
//We have to wait until message delivery has stopped or wake up the thread
void notifyStopMode(boolean newValue)
{
if (closed) throw new IllegalStateException("The session is closed");
if (modeStop==newValue) return;
modeStop=newValue;
if (modeStop) {
//Wait for the thread to sleep
synchronized (thread) {
;
}
} else {
//Wake up the thread
synchronized (thread) {
thread.notify();
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyStreamMessage.java
Index: SpyStreamMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.StreamMessage;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
import java.util.Vector;
/**
* This class implements javax.jms.StreamMessage
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyStreamMessage
extends SpyMessage
implements StreamMessage, Cloneable
{
// Attributes ----------------------------------------------------
private Vector content;
private int position;
private int offset;
private int size;
// Constructor ---------------------------------------------------
SpyStreamMessage()
{
msgReadOnly=false;
content=new Vector();
position=0;
size=0;
offset=0;
}
// Public --------------------------------------------------------
public boolean readBoolean() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Boolean) return
((Boolean)value).booleanValue();
else if (value instanceof String) return
Boolean.getBoolean((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public byte readByte() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Byte) return ((Byte)value).byteValue();
else if (value instanceof String) return
Byte.parseByte((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public short readShort() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Byte) return ((Byte)value).shortValue();
else if (value instanceof Short) return
((Short)value).shortValue();
else if (value instanceof String) return
Short.parseShort((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public char readChar() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Character) return
((Character)value).charValue();
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public int readInt() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Byte) return ((Byte)value).intValue();
else if (value instanceof Short) return
((Short)value).intValue();
else if (value instanceof Integer) return
((Integer)value).intValue();
else if (value instanceof String) return
Integer.parseInt((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public long readLong() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Byte) return ((Byte)value).longValue();
else if (value instanceof Short) return
((Short)value).longValue();
else if (value instanceof Integer) return
((Integer)value).longValue();
else if (value instanceof Long) return
((Long)value).longValue();
else if (value instanceof String) return
Long.parseLong((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public float readFloat() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Float) return ((Float)value).floatValue();
else if (value instanceof String) return
Float.parseFloat((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public double readDouble() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Float) return
((Float)value).doubleValue();
else if (value instanceof Double) return
((Double)value).doubleValue();
else if (value instanceof String) return
Double.parseDouble((String)value);
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public String readString() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
if (value instanceof Boolean) return
((Boolean)value).toString();
else if (value instanceof Byte) return
((Byte)value).toString();
else if (value instanceof Short) return
((Short)value).toString();
else if (value instanceof Character) return
((Character)value).toString();
else if (value instanceof Integer) return
((Integer)value).toString();
else if (value instanceof Long) return
((Long)value).toString();
else if (value instanceof Float) return
((Float)value).toString();
else if (value instanceof Double) return
((Double)value).toString();
else if (value instanceof String) return (String)value;
else throw new MessageFormatException("Invalid conversion");
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public int readBytes(byte[] value) throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object myObj=content.get(position);
if (!(myObj instanceof byte[])) throw new
MessageFormatException("Invalid conversion");
byte[] obj=(byte[])myObj;
if (obj.length==0) {
position++;
offset=0;
return 0;
}
if (offset>=obj.length) return -1;
if (obj.length-offset<value.length) {
for(int i=0;i<obj.length;i++) value[i]=obj[i+offset];
position++;
offset=0;
return obj.length-offset;
} else {
for(int i=0;i<value.length;i++) value[i]=obj[i+offset];
offset+=value.length;
return value.length;
}
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public Object readObject() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("The message
body is writeonly");
try {
Object value=content.get(position);
position++;
offset=0;
return value;
} catch (ArrayIndexOutOfBoundsException e) {
throw new MessageEOFException("");
}
}
public void writeBoolean(boolean value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Boolean(value));
}
public void writeByte(byte value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Byte(value));
}
public void writeShort(short value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Short(value));
}
public void writeChar(char value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Character(value));
}
public void writeInt(int value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Integer(value));
}
public void writeLong(long value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Long(value));
}
public void writeFloat(float value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Float(value));
}
public void writeDouble(double value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new Double(value));
}
public void writeString(String value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(new String(value));
}
public void writeBytes(byte[] value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
content.add(value.clone());
}
public void writeBytes(byte[] value, int offset, int length) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
if (offset+length>value.length) throw new JMSException("Array is too
small");
byte[] temp = new byte[length];
for(int i=0;i<length;i++)
temp[i]=value[i+offset];
content.add(temp);
}
public void writeObject(Object value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("The message
body is readonly");
if (value instanceof Boolean) content.add(value);
else if (value instanceof Byte) content.add(value);
else if (value instanceof Short) content.add(value);
else if (value instanceof Character) content.add(value);
else if (value instanceof Integer) content.add(value);
else if (value instanceof Long) content.add(value);
else if (value instanceof Float) content.add(value);
else if (value instanceof Double) content.add(value);
else if (value instanceof String) content.add(value);
else if (value instanceof byte[]) content.add(((byte[])value).clone());
else throw new MessageFormatException("Invalid object type");
}
public void reset() throws JMSException
{
msgReadOnly=true;
position=0;
size=content.size();
offset=0;
}
public void clearBody() throws JMSException
{
content=new Vector();
position=0;
offset=0;
size=0;
super.clearBody();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTemporaryQueue.java
Index: SpyTemporaryQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TemporaryQueue;
import javax.jms.JMSException;
/**
* This class implements javax.jms.TemporaryQueue
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTemporaryQueue
extends SpyQueue
implements TemporaryQueue
{
//The DistributedConnection of its creator
SpyDistributedConnection dc;
// Constructor ---------------------------------------------------
SpyTemporaryQueue(String queueName,SpyDistributedConnection dc_)
{
super(queueName);
dc=dc_;
}
// Public --------------------------------------------------------
public void delete() throws JMSException
{
try {
dc.cr.deleteTemporaryDestination(this);
} catch (Exception e) {
Log.error(e);
throw new JMSException("Cannot delete the TemporaryQueue");
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTemporaryTopic.java
Index: SpyTemporaryTopic.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TemporaryTopic;
import javax.jms.JMSException;
/**
* This class implements javax.jms.TemporaryTopic
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTemporaryTopic
extends SpyTopic
implements TemporaryTopic
{
//The DistributedConnection of its creator
SpyDistributedConnection dc;
// Constructor ---------------------------------------------------
SpyTemporaryTopic(String topicName, SpyDistributedConnection dc_)
{
super(topicName);
dc=dc_;
}
// Public --------------------------------------------------------
public void delete() throws JMSException
{
try {
dc.cr.deleteTemporaryDestination(this);
} catch (Exception e) {
Log.error(e);
throw new JMSException("Cannot delete the TemporaryTopic");
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTextMessage.java
Index: SpyTextMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
/**
* This class implements javax.jms.TextMessage
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTextMessage
extends SpyMessage
implements Cloneable,TextMessage
{
// Attributes ----------------------------------------------------
private String content=null;
// Public --------------------------------------------------------
public void setText(String string) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("Cannot set
the content");
content=string;
}
public String getText() throws JMSException
{
return content;
}
public void clearBody() throws JMSException
{
content=null;
super.clearBody();
}
// Object override -----------------------------------------------
public String toString()
{
try {
return "TextMessage@"+getText();
} catch (JMSException e) {
return "toString() failed !";
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTopic.java
Index: SpyTopic.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.Topic;
import javax.jms.JMSException;
import java.io.Serializable;
/**
* This class implements javax.jms.Topic
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTopic
extends SpyDestination
implements Topic, Serializable
{
// Constructor ---------------------------------------------------
SpyTopic(String topicName)
{
super();
name=topicName;
}
// Public --------------------------------------------------------
public String getTopicName() throws JMSException
{
return name;
}
public String toString()
{
return "Topic@"+name;
}
// Object override -----------------------------------------------
//A topic is identified by its name
public boolean equals(Object obj)
{
if (obj instanceof SpyTopic)
return ((SpyDestination)obj).name.equals(name);
return false;
}
public int hashCode()
{
return name.hashCode();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTopicConnection.java
Index: SpyTopicConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TopicConnection;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.TemporaryTopic;
import javax.jms.ConnectionConsumer;
import javax.jms.ServerSessionPool;
import javax.jms.Topic;
import java.io.Serializable;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
/**
* This class implements javax.jms.TopicConnection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTopicConnection
extends SpyConnection
implements Serializable, TopicConnection
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Constructor ---------------------------------------------------
public SpyTopicConnection(DistributedJMSServer theServer,String cID,String
crCN) throws JMSException
{
super(theServer,cID,crCN);
}
// Public --------------------------------------------------------
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
TopicSession session=new
SpyTopicSession(this,transacted,acknowledgeMode,modeStop);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
}
return session;
}
public ConnectionConsumer createConnectionConsumer(Topic topic,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws
JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
//Not impelemted yet
return null;
}
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws
JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
//Not impelemted yet
return null;
}
// Package protected ---------------------------------------------
void sendToServer(SpyMessage[] c) throws JMSException
{
Log.log("Connection: sendToServer("+c.length+" msgs)");
super.sendToServer(c);
}
TemporaryTopic getTemporaryTopic() throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
try {
return provider.getTemporaryTopic(distributedConnection);
} catch (Exception e) {
failureHandler(e,"Cannot create a temporary topic !");
return null;
}
}
Topic createTopic(String name) throws JMSException
{
try {
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
return provider.createTopic(name);
} catch (Exception e) {
failureHandler(e,"Cannot get the topic from the provider");
return null;
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java
Index: SpyTopicPublisher.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TopicPublisher;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.InvalidDestinationException;
import java.util.Date;
/**
* This class implements javax.jms.TopicPublisher
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTopicPublisher
extends SpyMessageProducer
implements TopicPublisher
{
// Attributes ----------------------------------------------------
//The session to which this publisher is linked
private SpyTopicSession mySession;
//The topic of this publisher
private Topic myTopic=null;
// Constructor ---------------------------------------------------
SpyTopicPublisher(SpyTopicSession s,Topic t)
{
mySession=s;
myTopic=t;
}
// Public --------------------------------------------------------
public Topic getTopic() throws JMSException
{
return myTopic;
}
//Publish methods
public void publish(Message message) throws JMSException
{
if (myTopic==null) throw new InvalidDestinationException("I do not
have a default Destination !");
publish(myTopic,message,defaultDeliveryMode,defaultPriority,defaultTTL);
}
public void publish(Topic topic, Message message) throws JMSException
{
publish(topic,message,defaultDeliveryMode,defaultPriority,defaultTTL);
}
public void publish(Message message, int deliveryMode, int priority, long
timeToLive) throws JMSException
{
if (myTopic==null) throw new InvalidDestinationException("Destination
is null !");
publish(myTopic,message,deliveryMode,priority,timeToLive);
}
public void publish(Topic topic, Message mes, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
//We only accept our classes (for now)
if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot
deliver this message");
SpyMessage message=(SpyMessage)mes;
//Set the header fields
message.jmsDestination=topic;
message.setJMSDeliveryMode(deliveryMode);
Date ts=new Date();
message.setJMSTimestamp(ts.getTime());
if (timeToLive==0) {
message.setJMSExpiration(0);
} else {
message.setJMSExpiration(timeToLive+ts.getTime());
}
message.setJMSPriority(priority);
message.setJMSMessageID(mySession.getNewMessageID());
//Set the properties and the message body in ReadOnly mode
//the client has to call clearProperties() and clearBody() if he wants
to modify those values
message.setReadOnlyMode();
//This message is not redelivered
message.setJMSRedelivered(false);
//We must put a 'new message' in the Session's outgoing queue [3.9]
mySession.sendMessage(message.myClone());
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.Destination;
import javax.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import org.spydermq.selectors.Selector;
/**
* This class implements javax.jms.TopicSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTopicSession
extends SpySession
implements TopicSession
{
// Constructor ---------------------------------------------------
SpyTopicSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
super(myConnection,transacted,acknowledgeMode,stop);
}
// Public --------------------------------------------------------
public Topic createTopic(String topicName) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyTopicConnection)connection).createTopic(topicName);
}
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return createSubscriber(topic,null,false);
}
public TopicSubscriber createSubscriber(Topic topic, String messageSelector,
boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
SessionQueue sessionQueue=addConsumer(topic,sub);
if (messageSelector!=null) {
Selector selector=new Selector(messageSelector);
sub.setSelector(selector,messageSelector);
}
return sub;
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Not yet implemented
return createSubscriber(topic);
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String
messageSelector, boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Not yet implemented
return createSubscriber(topic);
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyTopicPublisher(this,topic);
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyTopicConnection)connection).getTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
//Not yet implemented
}
// - Package protected ---------------------------------------------
// - Not part of the spec
//Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
Log.log("Session:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
if (mes.isOutdated()) return;
//Get the SessionQueue for this Destination
SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
if (sessionQueue==null) return;
//Work on the set of SpyTopicSubscriber for this topic
Iterator i=sessionQueue.subscribers.iterator();
while (i.hasNext()) {
SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
sub.addMessage(mes);
}
}
//called by a MessageProducer object which needs to publish a message
void sendMessage(SpyMessage m) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
//Synchronize with the outgoingQueue
synchronized (outgoingQueue)
{
//Test the priority
int pri=m.getJMSPriority();
if (pri<=4) {
//normal priority message
outgoingQueue.addLast(m);
} else {
//expedited priority message
int size=outgoingQueue.size();
int i=0;
for(;i<size;i++) {
if
(((SpyMessage)outgoingQueue.get(i)).getJMSPriority()<pri) break;
}
outgoingQueue.add(i,m);
}
}
//notify the thread that there is work to do
//we should change this...
synchronized (thread)
{
thread.notify();
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Date;
import org.spydermq.selectors.Selector;
/**
* This class implements javax.jms.TopicSubscriber
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
implements TopicSubscriber
{
// Attributes ----------------------------------------------------
//The topic I registered
private Topic topic;
//Am I in local mode ?
boolean local;
// Constructor ---------------------------------------------------
SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local)
{
super(session);
this.topic=topic;
this.local=local;
}
// Public --------------------------------------------------------
public Topic getTopic() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return topic;
}
public boolean getNoLocal() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return local;
}
//Overrides MessageConsumer
public void close() throws JMSException
{
if (closed) return;
closed=true;
session.removeConsumer(topic,this);
if (waitInReceive&&messageListener==null) {
//A consumer could be waiting in receive()
synchronized (messages) {
messages.notify();
}
}
}
public Message receive() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
synchronized (messages) {
//if the client follows the specification [4.4.6], he cannot
use this session
//to asynchronously receive a message or receive() in another
thread.
//If a message is already pending for this session, we can
immediatly deliver it
while (true) {
if (closed) return null;
if (!session.modeStop) {
Message mes=getMessage();
if (mes!=null) return mes;
} else Log.notice("the connection is stopped !");
try {
waitInReceive=true;
messages.wait();
} catch (InterruptedException e) {
} finally {
waitInReceive=false;
}
}
}
}
public Message receive(long timeOut) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (timeOut==0) return receive();
long endTime=(new Date()).getTime()+timeOut;
synchronized (messages) {
//if the client respects the specification [4.4.6], he cannot
use this session
//to asynchronously receive a message or receive() from
another thread.
//If a message is already pending for this session, we can
deliver it
while (true) {
if (closed) return null;
if (!session.modeStop) {
Message mes=getMessage();
if (mes!=null) return mes;
} else Log.notice("the connection is stopped !");
long att=endTime-((new Date()).getTime());
if (att<=0) return null;
try {
waitInReceive=true;
messages.wait(att);
} catch (InterruptedException e) {
} finally {
waitInReceive=false;
}
}
}
}
public Message receiveNoWait() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
synchronized (messages) {
while (true) {
if (session.modeStop) return null;
return getMessage();
}
}
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
messageListener=listener;
//Signal the change to the session thread ( it could sleep, while
there are messages for him )
synchronized (session.thread) {
session.thread.notify();
}
}
}