User: pra
Date: 00/12/06 04:52:36
Added: src/main/org/jboss/jms/asf
OpenJMSServerSessionPoolFactory.java
ServerSessionPoolFactory.java
ServerSessionPoolLoader.java
ServerSessionPoolLoaderMBean.java ThreadPool.java
Log:
Added JMS specific helper stuff - MBeans to lookup external JMS and ASF pool stuff
Revision Changes Path
1.1
jboss/src/main/org/jboss/jms/asf/OpenJMSServerSessionPoolFactory.java
Index: OpenJMSServerSessionPoolFactory.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.JMSException;
// Its in that package, but belongs to jboss ;-)
import org.exolab.jms.client.OpenJMSServerSessionPool;
// The base
import org.exolab.jms.client.JmsServerSessionPool;
import org.jboss.logging.Logger;
/**
* OpenJMSServerSessionPoolFactory.java
*
*
* Created: Wed Nov 29 16:01:04 2000
*
* @author
* @version
*/
public class OpenJMSServerSessionPoolFactory implements ServerSessionPoolFactory,
java.io.Serializable {
private Hashtable pools = new Hashtable();
private String name;
public OpenJMSServerSessionPoolFactory() {
// Set up their specific logging
try {
org.exolab.core.logger.LoggerIfc logger =
org.exolab.core.logger.LoggerFactory.create(null, null);
org.exolab.core.logger.LogEventType event =
org.exolab.core.logger.LogEventType.getLogEventType("debug");
if (event != null)
{
logger.setLogLevel(event);
}
}catch(Exception ex) {
Logger.exception(ex);
throw new RuntimeException("Unable to OpenJMSServerSessionPool: "+ex);
}
}
public void setName(String name){this.name = name;}
public String getName(){return name;}
public ServerSessionPool getServerSessionPool(Connection con, int maxSession,
boolean isTransacted, int ack, MessageListener listener) throws JMSException {
/*
This is probably basically fucked up. The ServerSessionPool in
OpenJMS is a Singleton. Every one that uses it will end up in
the same Connection and against the same messagelistener.
*/
// We need a pool, but what should we key on, a guess the adress
// of the listener is the only really uniqe here
String key = listener.toString();// Or hash?
if (pools.containsKey(key)) {
return (ServerSessionPool)pools.get(key);
} else {
// THis is fucking bully, have tp do it to get the classes to work
JmsServerSessionPool.init(15,listener);
ServerSessionPool pool = (ServerSessionPool)new
OpenJMSServerSessionPool(con, maxSession, isTransacted, ack, listener);
pools.put(key, pool);
return pool;
}
//JmsServerSessionPool.init(maxSession, listener);
//return JmsServerSessionPool.instance();
}
public static void main(String[] args) {
}
} // OpenJMSServerSessionPoolFactory
1.1 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
Index: ServerSessionPoolFactory.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import javax.jms.Connection;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.JMSException;
/**
* ServerSessionPoolFactory.java
*
*
* Created: Wed Nov 29 15:55:21 2000
*
* @author
* @version
*/
public interface ServerSessionPoolFactory {
public void setName(String name);
public String getName();
public ServerSessionPool getServerSessionPool(Connection con, int maxSession,
boolean isTransacted, int ack, MessageListener listener)throws JMSException;
} // ServerSessionPoolFactory
1.1 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java
Index: ServerSessionPoolLoader.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import javax.management.ObjectName;
import javax.management.MBeanServer;
import javax.naming.Context;
import javax.naming.Name;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.naming.NameNotFoundException;
import org.jboss.util.ServiceMBeanSupport;
import org.jboss.logging.Logger;
/**
* ServerSessionPoolLoader.java
*
*
* Created: Wed Nov 29 16:14:46 2000
*
* @author
* @version
*/
public class ServerSessionPoolLoader extends ServiceMBeanSupport
implements ServerSessionPoolLoaderMBean{
private ServerSessionPoolFactory poolFactory;
public ServerSessionPoolLoader(String name, String poolFactoryClass) {
try {
Class cls = Class.forName(poolFactoryClass);
poolFactory = (ServerSessionPoolFactory)cls.newInstance();
} catch(Exception e) {
Logger.exception(e);
throw new RuntimeException("Unable to initialize
ServerSessionPoolFactory '"+name+"': "+e);
}
poolFactory.setName(name);
}
public ObjectName getObjectName(MBeanServer parm1, ObjectName parm2) throws
javax.management.MalformedObjectNameException {
return (parm2 == null) ? new
ObjectName(OBJECT_NAME+",name="+poolFactory.getName()) : parm2;
}
public String getName() {
return poolFactory.getName();
}
public void startService() throws Exception {
initializeAdapter();
}
public void stopService() {
// Unbind from JNDI
try {
String name = poolFactory.getName();
new InitialContext().unbind("java:/"+name);
log.log("JMA Provider Adapter "+name+" removed from JNDI");
//source.close();
//log.log("XA Connection pool "+name+" shut down");
} catch (NamingException e) {
// Ignore
}
}
// Private -------------------------------------------------------
private void initializeAdapter() throws NamingException {
Context ctx = null;
Object mgr = null;
/*
source.setTransactionManagerJNDIName("java:/TransactionManager");
try {
ctx = new InitialContext();
mgr = ctx.lookup("java:/TransactionManager");
} catch(NamingException e) {
throw new IllegalStateException("Cannot start XA Connection Pool; there
is no TransactionManager in JNDI!");
}
source.initialize();
*/
// Bind in JNDI
bind(new InitialContext(), "java:/"+poolFactory.getName(),poolFactory);
log.log("JMS provider Adapter "+poolFactory.getName()+" bound to
java:/"+poolFactory.getName());
}
private void bind(Context ctx, String name, Object val) throws NamingException {
// Bind val to name in ctx, and make sure that all intermediate contexts
exist
Name n = ctx.getNameParser("").parse(name);
while (n.size() > 1) {
String ctxName = n.get(0);
try {
ctx = (Context)ctx.lookup(ctxName);
} catch (NameNotFoundException e) {
ctx = ctx.createSubcontext(ctxName);
}
n = n.getSuffix(1);
}
ctx.bind(n.get(0), val);
}
public static void main(String[] args) {
}
} // ServerSessionPoolLoader
1.1
jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java
Index: ServerSessionPoolLoaderMBean.java
===================================================================
/*
* Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jboss.jms.asf;
import org.jboss.util.ServiceMBean;
/**
* ServerSessionPoolLoaderMBean.java
*
*
* Created: Wed Nov 29 16:20:17 2000
*
* @author
* @version
*/
public interface ServerSessionPoolLoaderMBean extends ServiceMBean {
public static final String OBJECT_NAME = ":service=ServerSessionPoolMBean";
} // ServerSessionPoolLoaderMBean
1.1 jboss/src/main/org/jboss/jms/asf/ThreadPool.java
Index: ThreadPool.java
===================================================================
/*
* jBoss, the OpenSource EJB server
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import java.util.Stack;
import org.jboss.logging.Logger;
/**
* Thread pool for JMS ASF. I am sorry Richard, I really tried to use
* the one in jboss.web, but it did not work, and I have to say I still
* does not understand how it ever could work, but I guess it did for you
* but not for me; hope mine work better for me. Basically its a total rip
* of with some modifications stolen from another place - Paul Hyden's
* Java Thread Programming (SAMS)
*
* @see <related>
* @author Rickard �berg ([EMAIL PROTECTED])
* @author Peter Antman ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class ThreadPool
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
Stack pool = new Stack();
int maxSize = 10;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public ThreadPool()
{
}
// Public --------------------------------------------------------
public synchronized void clear()
{
for (int i = 0; i < pool.size(); i++)
{
Worker w = (Worker)pool.get(i);
w.stop();
}
}
public void setMaximumSize(int size)
{
maxSize = size;
}
public synchronized void run(Runnable work)
{
// FIXME We should probably have a maximum of threads allowed to be
// created too. Here we just naively hands out
if (pool.size() == 0)
{
new Worker().run(work);
} else
{
Worker w = (Worker)pool.pop();
w.run(work);
}
}
// Package protected ---------------------------------------------
synchronized void returnWorker(Worker w)
{
if (pool.size() < maxSize)
pool.push(w);
else {
w.die();
}
}
// Inner classes -------------------------------------------------
class Worker
extends Thread
{
boolean running = true;
Slot slot = new Slot();
Worker()
{
start();
}
public synchronized void die()
{
running = false;
}
public synchronized void run(Runnable runner)
{
try {
slot.add(runner);
}catch(InterruptedException e) {
// If this happend there is something wring with the pooling
// there should never be one here before the object was removed
}
}
public void run()
{
while (running)
{
try {
// We use al slot we can wait on instead
Runnable r = slot.remove();
// If work is available then execute it
//if (runner != null - should never happe, but
// what the heck)
if (r != null)
{
try{
r.run();
} catch (Exception e) {
Logger.exception(e);
} finally {
Thread.interrupted();
}
}
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
returnWorker(this);
//}
}
}
}
}
/**
* A synchronized cubby hole - or Slot, only one thing
* can ever be in there. And you may use it to synchronize
* and wait on. Seems like the right thing for me.
*
* This is a trimed down version of the ObjectFIFO in
* Java Thread Programming
*/
class Slot {
Runnable slot;
int size = 0;
public synchronized void add(Runnable r) throws InterruptedException{
waitWhileFull();
slot = r;
size = 1;
this.notifyAll();
}
public synchronized Runnable remove() throws InterruptedException{
waitWhileEmpty();
Runnable r= slot;
slot = null;
size = 0;
this.notifyAll();
return r;
}
public synchronized void waitWhileFull() throws InterruptedException{
while( isFull() ) {
this.wait();
}
}
public synchronized void waitWhileEmpty() throws InterruptedException{
while( isEmpty() ) {
this.wait();
}
}
public synchronized boolean isEmpty() {
return(size == 0);
}
public synchronized boolean isFull() {
return(size == 1);
}
}
}