User: dmaplesden
Date: 01/11/13 17:53:40
Modified: src/main/org/jboss/mq SpyBytesMessage.java
SpyEncapsulatedMessage.java SpyMapMessage.java
SpyMessage.java SpyObjectMessage.java
SpyQueueSender.java SpySession.java
SpyStreamMessage.java SpyTextMessage.java
SpyTopicPublisher.java SpyXAResourceManager.java
Added: src/main/org/jboss/mq MessagePool.java
Log:
Added message object pool and changed file PM message log to use generic
SpyMessage.writeMessage and readMessage methods.
Revision Changes Path
1.5 +2 -2 jbossmq/src/main/org/jboss/mq/SpyBytesMessage.java
Index: SpyBytesMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyBytesMessage.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyBytesMessage.java 2001/10/28 04:07:34 1.4
+++ SpyBytesMessage.java 2001/11/14 01:53:39 1.5
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyBytesMessage
extends SpyMessage
@@ -402,7 +402,7 @@
public SpyMessage myClone()
throws JMSException {
- SpyBytesMessage result = new SpyBytesMessage();
+ SpyBytesMessage result = MessagePool.getBytesMessage();
this.reset();
result.copyProps( this );
if ( this.InternalArray != null ) {
1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyEncapsulatedMessage.java
Index: SpyEncapsulatedMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyEncapsulatedMessage.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyEncapsulatedMessage.java 2001/10/28 04:07:34 1.3
+++ SpyEncapsulatedMessage.java 2001/11/14 01:53:39 1.4
@@ -15,7 +15,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyEncapsulatedMessage
extends SpyObjectMessage {
@@ -54,7 +54,7 @@
public SpyMessage myClone()
throws javax.jms.JMSException {
- SpyEncapsulatedMessage result = new SpyEncapsulatedMessage();
+ SpyEncapsulatedMessage result = MessagePool.getEncapsulatedMessage();
result.copyProps( this );
//HACK to get around read only problem
boolean readOnly = result.header.msgReadOnly;
1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyMapMessage.java
Index: SpyMapMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMapMessage.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyMapMessage.java 2001/10/28 04:07:34 1.3
+++ SpyMapMessage.java 2001/11/14 01:53:39 1.4
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyMapMessage
extends SpyMessage
@@ -373,7 +373,7 @@
public SpyMessage myClone()
throws JMSException {
- SpyMapMessage result = new SpyMapMessage();
+ SpyMapMessage result = MessagePool.getMapMessage();
result.copyProps( this );
result.content = ( Hashtable )this.content.clone();
return result;
1.10 +44 -11 jbossmq/src/main/org/jboss/mq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessage.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyMessage.java 2001/11/10 21:38:04 1.9
+++ SpyMessage.java 2001/11/14 01:53:40 1.10
@@ -25,7 +25,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyMessage implements Serializable, Message, Comparable,
Externalizable {
@@ -68,7 +68,7 @@
//For ordering in the JMSServerQueue (set on the server side)
public transient long messageId;
}
-
+
public Header header = new Header();
public transient AcknowledgementRequest ack;
@@ -445,7 +445,7 @@
}
public SpyMessage myClone() throws JMSException {
- SpyMessage result = new SpyMessage();
+ SpyMessage result = MessagePool.getMessage();
result.copyProps(this);
return result;
}
@@ -541,7 +541,7 @@
protected static final byte ENCAP_MESS = 6;
protected static final byte SPY_MESS = 7;
protected static final int BYTE = 0;
-
+
protected static final int SHORT = 1;
protected static final int INT = 2;
protected static final int LONG = 3;
@@ -576,25 +576,25 @@
byte type = in.readByte();
switch (type) {
case OBJECT_MESS :
- message = new SpyObjectMessage();
+ message = MessagePool.getObjectMessage();
break;
case BYTES_MESS :
- message = new SpyBytesMessage();
+ message = MessagePool.getBytesMessage();
break;
case MAP_MESS :
- message = new SpyMapMessage();
+ message = MessagePool.getMapMessage();
break;
case STREAM_MESS :
- message = new SpyStreamMessage();
+ message = MessagePool.getStreamMessage();
break;
case TEXT_MESS :
- message = new SpyTextMessage();
+ message = MessagePool.getTextMessage();
break;
case ENCAP_MESS :
- message = new SpyEncapsulatedMessage();
+ message = MessagePool.getEncapsulatedMessage();
break;
default :
- message = new SpyMessage();
+ message = MessagePool.getMessage();
}
try {
message.readExternal(in);
@@ -742,6 +742,39 @@
}
header.jmsProperties.put(key, value);
}
+ }
+
+ //clear for next use in pool
+ void reset() throws JMSException{
+ clearBody();
+ this.ack = null;
+ this.session = null;
+ //Set by send() method
+ this.header.jmsDestination = null;
+ this.header.jmsDeliveryMode = -1;
+ this.header.jmsExpiration = 0;
+ this.header.jmsPriority = -1;
+ this.header.jmsMessageID = null;
+ this.header.jmsTimeStamp = 0;
+ //Set by the client
+ this.header.jmsCorrelationID = true;
+ this.header.jmsCorrelationIDString = null;
+ this.header.jmsCorrelationIDbyte = null;
+ this.header.jmsReplyTo = null;
+ this.header.jmsType = null;
+ //Set by the provider
+ this.header.jmsRedelivered = false;
+ //Properties
+ this.header.jmsProperties.clear();
+ this.header.jmsPropertiesReadWrite=true;
+ //Message body
+ this.header.msgReadOnly = false;
+ //For noLocal to be able to tell if this was a locally produced message
+ this.header.producerClientId = null;
+ //For durable subscriptions
+ this.header.durableSubscriberID = null;
+ //For ordering in the JMSServerQueue (set on the server side)
+ this.header.messageId = 0;
}
}
1.7 +5 -5 jbossmq/src/main/org/jboss/mq/SpyObjectMessage.java
Index: SpyObjectMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyObjectMessage.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyObjectMessage.java 2001/10/28 04:07:34 1.6
+++ SpyObjectMessage.java 2001/11/14 01:53:40 1.7
@@ -17,7 +17,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyObjectMessage
extends SpyMessage
@@ -67,9 +67,9 @@
} else {
/**
- * Default implementation ObjectInputStream does not work
well
- * when running an a micro kernal style app-server like
JBoss.
- * We need to look for the Class in the context class loader
+ * Default implementation ObjectInputStream does not work
well
+ * when running an a micro kernal style app-server like
JBoss.
+ * We need to look for the Class in the context class loader
* and not in the System classloader.
*
* Would this be done better by using a MarshaedObject??
@@ -104,7 +104,7 @@
public SpyMessage myClone()
throws JMSException {
- SpyObjectMessage result = new SpyObjectMessage();
+ SpyObjectMessage result = MessagePool.getObjectMessage();
result.copyProps( this );
result.isByteArray = this.isByteArray;
if ( objectBytes != null ) {
1.3 +2 -2 jbossmq/src/main/org/jboss/mq/SpyQueueSender.java
Index: SpyQueueSender.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyQueueSender.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueSender.java 2001/08/17 03:04:01 1.2
+++ SpyQueueSender.java 2001/11/14 01:53:40 1.3
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueSender
extends SpyMessageProducer
@@ -94,7 +94,7 @@
// Encapsulate the message if not a SpyMessage
if ( !( message instanceof SpyMessage ) ) {
- SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
+ SpyEncapsulatedMessage m = MessagePool.getEncapsulatedMessage();
m.setMessage( message );
message = m;
}
1.7 +15 -15 jbossmq/src/main/org/jboss/mq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpySession.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpySession.java 2001/10/28 04:07:34 1.6
+++ SpySession.java 2001/11/14 01:53:40 1.7
@@ -33,7 +33,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public abstract class SpySession
implements Session, XASession {
@@ -130,7 +130,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyBytesMessage message = new SpyBytesMessage();
+ SpyBytesMessage message = MessagePool.getBytesMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -141,7 +141,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyMapMessage message = new SpyMapMessage();
+ SpyMapMessage message = MessagePool.getMapMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -152,7 +152,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyMessage message = new SpyMessage();
+ SpyMessage message = MessagePool.getMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -163,7 +163,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyObjectMessage message = new SpyObjectMessage();
+ SpyObjectMessage message = MessagePool.getObjectMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -174,7 +174,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyObjectMessage message = new SpyObjectMessage();
+ SpyObjectMessage message = MessagePool.getObjectMessage();
message.setObject( object );
message.header.producerClientId = connection.getClientID();
return message;
@@ -186,7 +186,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyStreamMessage message = new SpyStreamMessage();
+ SpyStreamMessage message = MessagePool.getStreamMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -197,7 +197,7 @@
throw new IllegalStateException( "The session is closed" );
}
- SpyTextMessage message = new SpyTextMessage();
+ SpyTextMessage message = MessagePool.getTextMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
@@ -236,25 +236,25 @@
Iterator i;
synchronized ( consumers ) {
-
+
//notify the sleeping synchronous listeners
if ( sessionConsumer != null ) {
sessionConsumer.close();
}
-
+
i = consumers.iterator();
}
-
+
while ( i.hasNext() ) {
SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next();
messageConsumer.close();
}
-
+
//deal with any unacked messages
if ( transacted && spyXAResource == null ) {
rollback();
}
-
+
connection.sessionClosing( this );
closed = true;
@@ -312,9 +312,9 @@
if ( !transacted ) {
throw new IllegalStateException( "The session is not transacted" );
}
-
+
cat.debug( "Session: rollback()" );
-
+
// rollback transaction
try {
connection.spyXAResourceManager.endTx( currentTransactionId, true );
1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyStreamMessage.java
Index: SpyStreamMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyStreamMessage.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyStreamMessage.java 2001/10/28 04:07:34 1.3
+++ SpyStreamMessage.java 2001/11/14 01:53:40 1.4
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyStreamMessage
extends SpyMessage
@@ -490,7 +490,7 @@
public SpyMessage myClone()
throws JMSException {
- SpyStreamMessage result = new SpyStreamMessage();
+ SpyStreamMessage result = MessagePool.getStreamMessage();
result.copyProps( this );
result.content = ( Vector )this.content.clone();
result.position = this.position;
1.7 +4 -4 jbossmq/src/main/org/jboss/mq/SpyTextMessage.java
Index: SpyTextMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyTextMessage.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyTextMessage.java 2001/11/04 06:53:31 1.6
+++ SpyTextMessage.java 2001/11/14 01:53:40 1.7
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyTextMessage
extends SpyMessage
@@ -31,7 +31,7 @@
private final static long serialVersionUID = 235726945332013953L;
private final static int chunkSize = 16384;
-
+
// Public --------------------------------------------------------
public void setText( String string )
@@ -60,7 +60,7 @@
public SpyMessage myClone()
throws JMSException
{
- SpyTextMessage result = new SpyTextMessage();
+ SpyTextMessage result = MessagePool.getTextMessage();
result.copyProps( this );
result.content = this.content;
return result;
@@ -76,7 +76,7 @@
content = null;
}
else
- {
+ {
// apply workaround for string > 64K bug in jdk's 1.3.*
// Read the no. of chunks this message is split into, allocate
1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyTopicPublisher.java
Index: SpyTopicPublisher.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyTopicPublisher.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyTopicPublisher.java 2001/10/04 23:25:11 1.3
+++ SpyTopicPublisher.java 2001/11/14 01:53:40 1.4
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyTopicPublisher
extends SpyMessageProducer
@@ -94,7 +94,7 @@
// Encapsulate the message if not a SpyMessage
if ( !( message instanceof SpyMessage ) ) {
- SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
+ SpyEncapsulatedMessage m = MessagePool.getEncapsulatedMessage();
m.setMessage( message );
message = m;
}
1.4 +11 -1 jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java
Index: SpyXAResourceManager.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyXAResourceManager.java 2001/08/28 21:38:24 1.3
+++ SpyXAResourceManager.java 2001/11/14 01:53:40 1.4
@@ -20,7 +20,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyXAResourceManager implements java.io.Serializable {
@@ -96,6 +96,11 @@
transaction.acks = job;
}
connection.send( transaction );
+ //release messages back to pool now they are sent
+ if(transaction.messages != null){
+ for(int i=0;i<transaction.messages.length;i++)
+ MessagePool.releaseMessage(transaction.messages[i]);
+ }
} else {
if ( state.txState != TX_PREPARED ) {
throw new XAException( "The transaction had not been prepared" );
@@ -104,6 +109,11 @@
transaction.xid = xid;
transaction.requestType = transaction.TWO_PHASE_COMMIT_COMMIT_REQUEST;
connection.send( transaction );
+ //release messages back to pool now they are sent
+ if(transaction.messages != null){
+ for(int i=0;i<transaction.messages.length;i++)
+ MessagePool.releaseMessage(transaction.messages[i]);
+ }
}
state.txState = TX_COMMITED;
}
1.1 jbossmq/src/main/org/jboss/mq/MessagePool.java
Index: MessagePool.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.mq;
/**
* This class provides a pool of SpyMessages.
*
* This is an very simple implementation first up.
*
* @author David Maplesden ([EMAIL PROTECTED])
*/
public class MessagePool {
//static flag which turns off pooling altogether if false.
public static final boolean POOL = true;
//no point growing pools too much, the max size any one pool will get
public static final int MAX_POOL_SIZE = 10*1000;
protected static java.util.ArrayList messagePool = new java.util.ArrayList();
protected static java.util.ArrayList bytesPool = new java.util.ArrayList();
protected static java.util.ArrayList mapPool = new java.util.ArrayList();
protected static java.util.ArrayList streamPool = new java.util.ArrayList();
protected static java.util.ArrayList objectPool = new java.util.ArrayList();
protected static java.util.ArrayList textPool = new java.util.ArrayList();
protected static java.util.ArrayList encapPool = new java.util.ArrayList();
/**
* Gets a message from the pools.
*/
public static SpyMessage getMessage(){
if(POOL){
synchronized(messagePool){
if(!messagePool.isEmpty())
return (SpyMessage) messagePool.remove(messagePool.size()-1);
}
}
return new SpyMessage();
}
/**
* Gets a bytes message from the pools.
*/
public static SpyBytesMessage getBytesMessage(){
if(POOL){
synchronized(bytesPool){
if(!bytesPool.isEmpty())
return (SpyBytesMessage) bytesPool.remove(bytesPool.size()-1);
}
}
return new SpyBytesMessage();
}
/**
* Gets a map message from the pools.
*/
public static SpyMapMessage getMapMessage(){
if(POOL){
synchronized(mapPool){
if(!mapPool.isEmpty())
return (SpyMapMessage) mapPool.remove(mapPool.size()-1);
}
}
return new SpyMapMessage();
}
/**
* Gets a stream message from the pools.
*/
public static SpyStreamMessage getStreamMessage(){
if(POOL){
synchronized(streamPool){
if(!streamPool.isEmpty())
return (SpyStreamMessage) streamPool.remove(streamPool.size()-1);
}
}
return new SpyStreamMessage();
}
/**
* Gets a object message from the pools.
*/
public static SpyObjectMessage getObjectMessage(){
if(POOL){
synchronized(objectPool){
if(!objectPool.isEmpty())
return (SpyObjectMessage) objectPool.remove(objectPool.size()-1);
}
}
return new SpyObjectMessage();
}
/**
* Gets a text message from the pools.
*/
public static SpyTextMessage getTextMessage(){
if(POOL){
synchronized(textPool){
if(!textPool.isEmpty())
return (SpyTextMessage) textPool.remove(textPool.size()-1);
}
}
return new SpyTextMessage();
}
/**
* Gets a encapsulated message from the pools.
*/
public static SpyEncapsulatedMessage getEncapsulatedMessage(){
if(POOL){
synchronized(encapPool){
if(!encapPool.isEmpty())
return (SpyEncapsulatedMessage) encapPool.remove(encapPool.size()-1);
}
}
return new SpyEncapsulatedMessage();
}
/**
* Releases a SpyMessage back to the pools for reuse.
*/
public static void releaseMessage(SpyMessage message){
if(!POOL){
return;
}else{
if(message == null)
return;
try{
message.reset();
}catch(javax.jms.JMSException e){
//unable to re-use message
return;
}
if(message instanceof SpyTextMessage){
synchronized(textPool){
if(textPool.size() < MAX_POOL_SIZE)
textPool.add(message);
}
}else if(message instanceof SpyEncapsulatedMessage){
//must test for encap mess before object mess because encap mess extends
object mess
synchronized(encapPool){
if(encapPool.size() < MAX_POOL_SIZE)
encapPool.add(message);
}
}else if(message instanceof SpyObjectMessage){
synchronized(objectPool){
if(objectPool.size() < MAX_POOL_SIZE)
objectPool.add(message);
}
}else if(message instanceof SpyBytesMessage){
synchronized(bytesPool){
if(bytesPool.size() < MAX_POOL_SIZE)
bytesPool.add(message);
}
}else if(message instanceof SpyMapMessage){
synchronized(mapPool){
if(mapPool.size() < MAX_POOL_SIZE)
mapPool.add(message);
}
}else if(message instanceof SpyStreamMessage){
synchronized(streamPool){
if(streamPool.size() < MAX_POOL_SIZE)
streamPool.add(message);
}
}else{
//plain old SpyMessage
synchronized(messagePool){
if(messagePool.size() < MAX_POOL_SIZE)
messagePool.add(message);
}
}
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development