User: hiram
Date: 00/12/27 14:45:56
Added: src/main/org/jboss/jms/asf StdServerSession.java
StdServerSessionPool.java
StdServerSessionPoolFactory.java
Log:
Bringing spyderMQ up to version 0.6. Added the MDB support for it that Peter had
implemented.
Revision Changes Path
1.1 jboss/src/main/org/jboss/jms/asf/StdServerSession.java
Index: StdServerSession.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import java.lang.Runnable;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.Session;
import org.jboss.logging.Logger;
/**
* StdServerSession.java
*
*
* Created: Thu Dec 7 18:25:40 2000
*
* @author
* @version
*/
public class StdServerSession implements Runnable, ServerSession {
private StdServerSessionPool serverSessionPool = null;
private Session session = null;
StdServerSession(StdServerSessionPool pool, Session session) throws JMSException{
serverSessionPool = pool;
this.session = session;
}
// --- Impl of JMS standard API
/**
* Implementation of ServerSession.getSession
*
* This simply returns what it has fetched from the connection. It is
* up to the jms provider to typecast it and have a private API to stuff
* messages into it.
*/
public Session getSession()
throws JMSException
{
return session;
}
// implementation of ServerSession.start
public void start()
throws JMSException
{
Logger.debug("Start invokes on server session");
if ( session != null) {
serverSessionPool.getThreadPool().run(this);
}
else {
throw new JMSException("No listener has been specified");
}
}
//--- Protected parts, used by other in the package
/**
* Runs in an own thread, basically calls the session.run(), it is up
* to the session to have been filled with messages and it will run
* against the listener set in StdServerSessionPool. When it has send
* all its messages it returns.
*/
public void run() {
try {
Logger.debug("Invoking run on session");
session.run();
}catch (Exception ex) {
// Log error
}finally {
StdServerSession.this.recycle();
}
}
/**
* This method is called by the ServerSessionPool when it is ready to
* be recycled intot the pool
*/
void recycle()
{
serverSessionPool.recycle(this);
}
} // StdServerSession
1.1 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Index: StdServerSessionPool.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
import javax.jms.QueueConnection;
import javax.jms.Session;
import org.jboss.logging.Logger;
/**
* StdServerSessionPool.java
*
*
* Created: Thu Dec 7 17:02:03 2000
*
* @author
* @version
*/
public class StdServerSessionPool implements ServerSessionPool {
private static final int DEFAULT_POOL_SIZE = 15;
private int poolSize = DEFAULT_POOL_SIZE;
private int ack;
private boolean transacted;
private MessageListener listener;
private Connection con;
private ThreadPool threadPool = new ThreadPool();
private Vector sessionPool = new Vector();
/**
* Minimal constructor, could also have stuff for pool size
*/
public StdServerSessionPool(Connection con, boolean transacted, int ack,
MessageListener listener) throws JMSException{
this(con,transacted,ack,listener,DEFAULT_POOL_SIZE);
/*
this.ack = ack;
this.listener = listener;
this.transacted = transacted;
threadPool.setMaximumSize(poolSize);
init();
*/
}
public StdServerSessionPool(Connection con, boolean transacted, int ack,
MessageListener listener, int maxSession) throws JMSException{
this.con = con;
this.ack = ack;
this.listener = listener;
this.transacted = transacted;
this.poolSize = maxSession;
threadPool.setMaximumSize(poolSize);
init();
Logger.debug("Server Session pool set up");
}
// --- JMS API for ServerSessionPool
// implementation of ServerSessionPool.getServerSession
public ServerSession getServerSession()
throws JMSException {
ServerSession result = null;
Logger.debug("Leaving out a server session");
try {
for (;;) {
if (sessionPool.size() > 0) {
result = (ServerSession)sessionPool.remove(0);
break;
}
else {
try {
synchronized (sessionPool) {
sessionPool.wait();
}
} catch (InterruptedException exception){
// ignore the error
}
}
}
}
catch (Exception exception) {
throw new JMSException("Error in getServerSession " + exception);
}
return result;
}
// --- Protected messages for StdServerSession to use
void recycle(StdServerSession session){
synchronized (sessionPool){
sessionPool.addElement(session);
sessionPool.notifyAll();
}
}
ThreadPool getThreadPool() {
return threadPool;
}
// --- Private methods used internally
private void init() throws JMSException{
for (int index = 0; index < poolSize; index++){
try {
// Here is the meat, that MUST follow the spec
Session ses = null;
if (con instanceof TopicConnection) {
ses = ((TopicConnection)con).createTopicSession(transacted,
ack);
} else if(con instanceof QueueConnection) {
ses = ((QueueConnection)con).createQueueSession(transacted,
ack);
Logger.debug("Creating a QueueSession" + ses);
} else {
Logger.debug("Error in getting session for con" + con);
throw new JMSException("Connection was not reconizable: " +
con);
}
// This might not be totala spec compliant since it
// says that app server should create as many
// message listeners its needs,
Logger.debug("Setting listener for session");
ses.setMessageListener(listener);
sessionPool.addElement(
new StdServerSession(this, ses)
);
}
catch (JMSException exception){
Logger.log("DEBUG Error in adding to pool: " + exception+ " Pool:
" + this + " listener: " + listener);
}
}
}
} // StdServerSessionPool
1.1
jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Index: StdServerSessionPoolFactory.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import java.util.Hashtable;
import javax.jms.ServerSessionPool;
import javax.jms.MessageListener;
import javax.jms.Connection;
import javax.jms.JMSException;
/**
* StdServerSessionPoolFactory.java
*
*
* Created: Fri Dec 22 09:47:41 2000
*
* @author Peter Antman
* @version
*/
public class StdServerSessionPoolFactory implements ServerSessionPoolFactory,
java.io.Serializable {
private Hashtable pools = new Hashtable();
private String name;
public StdServerSessionPoolFactory() {
}
public void setName(String name){this.name = name;}
public String getName(){return name;}
public ServerSessionPool getServerSessionPool(Connection con, int maxSession,
boolean isTransacted, int ack, MessageListener listener) throws JMSException {
/*
This is probably basically fucked up. The ServerSessionPool in
OpenJMS is a Singleton. Every one that uses it will end up in
the same Connection and against the same messagelistener.
*/
// We need a pool, but what should we key on, a guess the adress
// of the listener is the only really uniqe here
String key = listener.toString();// Or hash?
if (pools.containsKey(key)) {
return (ServerSessionPool)pools.get(key);
} else {
ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(con,
isTransacted, ack, listener,maxSession);
pools.put(key, pool);
return pool;
}
}
} // StdServerSessionPoolFactory