User: chirino
Date: 01/07/27 17:33:38
Added: src/main/org/jbossmq/server JMSQueue.java JMSTopic.java
QueuedTask.java StartServer.java
Log:
Once again many changes.
- The logic that handled the processing of queue and topic messages
was seperated our more to make it easier to follow.
- A QueuedTask class was created to avoid unneeded processing of queues.
- The interface between the client-server-queues-peristence manager to handel
DurableSubscription was too verbose, created a DurableSubscripton class and now
SpyTopics can be inspected to see if they are being used as a DurableSubscription
- The MBeans that add queues and topics makes it simpler to configure a queue/topic.
Revision Changes Path
1.1 jbossmq/src/main/org/jbossmq/server/JMSQueue.java
Index: JMSQueue.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
import org.jbossmq.*;
/**
* This class is a message queue which is stored (hashed by Destination) on the
* JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSQueue extends JMSDestination {
// Constructor ---------------------------------------------------
JMSQueue(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws
JMSException
{
super( dest, temporary, server );
exclusiveQueue = new ExclusiveQueue(server);
// If this is a non-temp queue, then we should persist data
if( temporaryDestination == null ) {
server.getPersistenceManager().initQueue(dest);
}
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
cat.debug(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
temporaryDestination!=null ) {
throw new JMSException("Cannot write a persistent message to a
temporary destination!");
}
//Number the message so that we can preserve order of delivery.
synchronized(this) {
mes.messageId = messageIdCounter++;
}
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
server.getPersistenceManager().add(mes, txId);
exclusiveQueue.addMessage(mes, txId);
}
public SpyMessage[] browse(String selector) throws JMSException {
return exclusiveQueue.browse( selector );
}
public String toString() {
return "JMSDestination:"+destination;
}
//list of messages
ExclusiveQueue exclusiveQueue;
// Package protected ---------------------------------------------
JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws
JMSException {
cat.debug("Adding consumer: "+c+")");
exclusiveQueue.addConsumer(c);
return this;
}
public void destroy() throws JMSException {
server.getPersistenceManager().destroyQueue(destination);
}
/**
* notifyMessageAvailable method comment.
*/
public void notifyMessageAvailable() {
exclusiveQueue.notifyMessageAvailable();
}
/**
* receiveNoWait method comment.
*/
public org.jbossmq.SpyMessage receiveNoWait(org.jbossmq.Subscription sub)
throws javax.jms.JMSException {
return exclusiveQueue.receiveNoWait(sub);
}
// Package protected ---------------------------------------------
void removeConsumer(Subscription sub,ClientConsumer c) throws JMSException {
cat.debug("Removing consumer: "+c+")");
exclusiveQueue.removeConsumer(c);
}
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes)
{
cat.debug(""+this+"->restoreMessage(mes="+mes+")");
synchronized(this) {
messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
}
exclusiveQueue.restoreMessage(mes);
}
}
1.1 jbossmq/src/main/org/jbossmq/server/JMSTopic.java
Index: JMSTopic.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
import org.jbossmq.*;
/**
* This class is a message queue which is stored (hashed by Destination) on the
* JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSTopic extends JMSDestination {
//Hashmap of ExclusiveQueues
HashMap exclusiveQueues = new HashMap();
//ShareQueue used for topics
SharedQueue sharedQueue;
// Constructor ---------------------------------------------------
JMSTopic(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws
JMSException
{
super( dest, temporary, server );
sharedQueue = new SharedQueue(server);
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
cat.debug(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
temporaryDestination!=null ) {
throw new JMSException("Cannot write a persistent message to a
temporary destination!");
}
//Number the message so that we can preserve order of delivery.
synchronized(this) {
mes.messageId = messageIdCounter++;
}
sharedQueue.addMessage(mes, txId);
synchronized (exclusiveQueues) {
if( exclusiveQueues.size() == 0 )
return;
Iterator iter = exclusiveQueues.values().iterator();
while( iter.hasNext() ) {
JMSQueue eq = (JMSQueue)iter.next();
SpyMessage clone = mes.myClone();
clone.setJMSDestination( eq.destination );
eq.addMessage(clone, txId);
}
}
}
// Package protected ---------------------------------------------
JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws
JMSException {
SpyTopic topic = (SpyTopic)sub.destination;
DurableSubcriptionID id = topic.getDurableSubscriptionID();
if( id!=null ) {
server.getStateManager().setDurableSubscription(server, id,
(SpyTopic)sub.destination);
JMSQueue queue = getDurableSubscription(id);
if( queue == null ) {
server.getStateManager().setDurableSubscription(server, id, topic);
queue = getDurableSubscription(id);
if( queue == null ) {
throw new JMSException("Could not create a the
durable subscription.");
}
}
return queue.addConsumer(sub, c);
} else {
cat.debug("Adding consumer: "+c);
sharedQueue.addConsumer(c);
return this;
}
}
public void createDurableSubscription(DurableSubcriptionID sub) throws
JMSException {
if( temporaryDestination != null )
throw new JMSException("Not a valid operation on a temporary
topic");
SpyTopic dstopic = new SpyTopic( (SpyTopic)destination, sub);
synchronized (exclusiveQueues) {
exclusiveQueues.put(sub, new JMSQueue(dstopic, null, server));
}
}
public void destoryDurableSubscription(DurableSubcriptionID id) throws
JMSException
{
synchronized (exclusiveQueues) {
((JMSQueue)exclusiveQueues.remove(id)).destroy();
}
}
// Package protected ---------------------------------------------
JMSQueue getDurableSubscription(DurableSubcriptionID id) {
synchronized (exclusiveQueues) {
return (JMSQueue)exclusiveQueues.get( id );
}
}
/**
* notifyMessageAvailable method comment.
*/
public void notifyMessageAvailable() {
sharedQueue.notifyMessageAvailable();
}
/**
* receiveNoWait method comment.
*/
public org.jbossmq.SpyMessage receiveNoWait(org.jbossmq.Subscription sub)
throws javax.jms.JMSException {
throw new JMSException("Internal Error");
}
// Package protected ---------------------------------------------
void removeConsumer(Subscription sub,ClientConsumer c) throws JMSException {
cat.debug("Removing consumer: "+c);
sharedQueue.removeConsumer(c);
}
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes)
{
cat.debug("Restoring Message: "+mes);
synchronized(this) {
messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
}
SpyTopic topic = (SpyTopic)mes.getJMSDestination();
JMSQueue eq = getDurableSubscription(topic.getDurableSubscriptionID());
eq.restoreMessage(mes);
}
}
1.1 jbossmq/src/main/org/jbossmq/server/QueuedTask.java
Index: QueuedTask.java
===================================================================
package org.jbossmq.server;
import EDU.oswego.cs.dl.util.concurrent.Executor;
/**
* This class allows us to reduce the number of times
* we add a task to a thread pool. Most tasks in the
* server are used to process an object due to some
* modification of it's state. If multiple threads modify
* the state of the object conncurently, The task only
* needs to run once to process the multiple modifications.
*
* @author: Administrator
*/
public class QueuedTask implements Runnable {
Runnable task;
boolean isQueued=false;
static org.apache.log4j.Category cat =
org.apache.log4j.Category.getInstance(QueuedTask.class);
public QueuedTask( Runnable task ) {
this.task = task;
}
synchronized public void executeWith(Executor e) throws InterruptedException {
if( !isQueued ) {
cat.debug("Task was not queued in the executor previously,
adding to executor");
isQueued = true;
e.execute(this);
} else {
cat.debug("Task was queued previously.");
}
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public void run() {
synchronized( this ) {
isQueued = false;
}
task.run();
}
}
1.10 +13 -15 jbossmq/src/main/org/jbossmq/server/StartServer.java
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development