User: user57
Date: 01/07/31 22:19:47
Modified: src/main/org/jbossmq Tag: jboss_buildmagic Connection.java
SpyBytesMessage.java SpyConnection.java
SpyMessage.java SpyMessageConsumer.java
SpyQueueSender.java SpyQueueSession.java
SpySession.java SpyTopicSession.java
SpyXAConnection.java Subscription.java
Log:
o updated from HEAD
Revision Changes Path
No revision
No revision
1.3.2.1 +15 -21 jbossmq/src/main/org/jbossmq/Connection.java
Index: Connection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Connection.java,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- Connection.java 2001/07/28 00:30:15 1.3
+++ Connection.java 2001/08/01 05:19:47 1.3.2.1
@@ -146,8 +146,6 @@
} catch (Exception e) {
throw new SpyJMSException( "Cannot enable the connection with
the JMS server",e);
}
-
- changeModeStop(modeStop);
}
public void stop() throws JMSException {
@@ -166,8 +164,6 @@
throw new SpyJMSException( "Cannot disable the connection with
the JMS server",e);
}
- changeModeStop(modeStop);
-
}
public synchronized void close() throws JMSException {
@@ -247,19 +243,6 @@
}
}
- //notify his sessions that he has changed his stopped mode
- synchronized void changeModeStop(boolean newValue) {
- synchronized (createdSessions) {
-
- Iterator i = createdSessions.iterator();
- while (i.hasNext()) {
- ((SpySession) i.next()).setStopMode(newValue);
- }
-
- }
-
- }
-
//Called by a session when it is closing
void sessionClosing(SpySession who) {
synchronized (createdSessions) {
@@ -278,7 +261,8 @@
try {
clientID = serverIL.getID();
} catch (Exception e) {
- throw new SpyJMSException( "Cannot get a client ID",e);
+ cat.debug("Server Exception: ", e);
+ throw new SpyJMSException( "Cannot get a client ID:
"+e.getMessage(), e );
}
}
@@ -398,7 +382,12 @@
throw new IllegalStateException("The connection is closed");
try {
- return serverIL.receive(connectionToken, sub.subscriptionId,
wait);
+ SpyMessage message = serverIL.receive(connectionToken,
sub.subscriptionId, wait);
+ if( message != null ) {
+ message.shouldAck = true;
+ message.routeToSubscriber = sub.subscriptionId;
+ }
+ return message;
} catch (Exception e) {
throw new SpyJMSException( "Cannot create a
ConnectionReceiver",e);
}
@@ -620,7 +609,12 @@
while( iter.hasNext() ) {
SpyConsumer consumer =
(SpyConsumer)iter.next();
-
consumer.addMessage(requests[i].message);
+ if( iter.hasNext() ) {
+
consumer.addMessage(requests[i].message.myClone());
+ }
+ else {
+
consumer.addMessage(requests[i].message);
+ }
consumersUsed.add(consumer);
}
1.4.4.1 +126 -116 jbossmq/src/main/org/jbossmq/SpyBytesMessage.java
Index: SpyBytesMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyBytesMessage.java,v
retrieving revision 1.4
retrieving revision 1.4.4.1
diff -u -r1.4 -r1.4.4.1
--- SpyBytesMessage.java 2001/05/17 23:46:29 1.4
+++ SpyBytesMessage.java 2001/08/01 05:19:47 1.4.4.1
@@ -20,26 +20,26 @@
/**
* This class implements javax.jms.BytesMessage
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
- *
+ *
* @version $Revision$
*/
-public class SpyBytesMessage
- extends SpyMessage
+public class SpyBytesMessage
+ extends SpyMessage
implements Cloneable, BytesMessage
{
-
+
// Attributes ----------------------------------------------------
- private ByteArrayOutputStream ostream=null;
- private DataOutputStream p=null;
- private byte[] InternalArray=null;
- private ByteArrayInputStream istream=null;
- private DataInputStream m=null;
+ private transient ByteArrayOutputStream ostream=null;
+ private transient DataOutputStream p=null;
+ private transient ByteArrayInputStream istream=null;
+ private transient DataInputStream m=null;
+ private byte[] InternalArray=null;
// Constructor ---------------------------------------------------
-
+
public SpyBytesMessage()
{
msgReadOnly=false;
@@ -49,272 +49,272 @@
// Public --------------------------------------------------------
- public boolean readBoolean() throws JMSException
+ public boolean readBoolean() throws JMSException
{
checkRead();
- try {
+ try {
return m.readBoolean();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public byte readByte() throws JMSException
+ public byte readByte() throws JMSException
{
- checkRead();
- try {
+ checkRead();
+ try {
return m.readByte();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public int readUnsignedByte() throws JMSException
+ public int readUnsignedByte() throws JMSException
{
checkRead();
- try {
+ try {
return m.readUnsignedByte();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public short readShort() throws JMSException
+ public short readShort() throws JMSException
{
checkRead();
- try {
+ try {
return m.readShort();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public int readUnsignedShort() throws JMSException
+
+ public int readUnsignedShort() throws JMSException
{
checkRead();
- try {
+ try {
return m.readUnsignedShort();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public char readChar() throws JMSException
+ public char readChar() throws JMSException
{
checkRead();
- try {
+ try {
return m.readChar();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public int readInt() throws JMSException
+ public int readInt() throws JMSException
{
checkRead();
- try {
+ try {
return m.readInt();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public long readLong() throws JMSException
+ public long readLong() throws JMSException
{
checkRead();
- try {
+ try {
return m.readLong();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public float readFloat() throws JMSException
+ public float readFloat() throws JMSException
{
checkRead();
- try {
+ try {
return m.readFloat();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public double readDouble() throws JMSException
+
+ public double readDouble() throws JMSException
{
checkRead();
- try {
+ try {
return m.readDouble();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public String readUTF() throws JMSException
+ public String readUTF() throws JMSException
{
checkRead();
- try {
+ try {
return m.readUTF();
} catch (EOFException e) {
throw new MessageEOFException("");
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public int readBytes(byte[] value) throws JMSException
+
+ public int readBytes(byte[] value) throws JMSException
{
checkRead();
- try {
+ try {
return m.read(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public int readBytes(byte[] value, int length) throws JMSException
+ public int readBytes(byte[] value, int length) throws JMSException
{
checkRead();
- try {
+ try {
return m.read(value,0,length);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeBoolean(boolean value) throws JMSException
+ public void writeBoolean(boolean value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeBoolean(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeByte(byte value) throws JMSException
+ public void writeByte(byte value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeByte(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeShort(short value) throws JMSException
+ public void writeShort(short value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeShort(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public void writeChar(char value) throws JMSException
+
+ public void writeChar(char value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeChar(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public void writeInt(int value) throws JMSException
+
+ public void writeInt(int value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeInt(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeLong(long value) throws JMSException
+ public void writeLong(long value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeLong(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeFloat(float value) throws JMSException
+ public void writeFloat(float value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeFloat(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeDouble(double value) throws JMSException
+ public void writeDouble(double value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeDouble(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeUTF(String value) throws JMSException
+ public void writeUTF(String value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.writeUTF(value);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
- public void writeBytes(byte[] value) throws JMSException
+ public void writeBytes(byte[] value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.write(value,0,value.length);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public void writeBytes(byte[] value, int offset, int length) throws
JMSException
+
+ public void writeBytes(byte[] value, int offset, int length) throws
JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
p.write(value,offset,length);
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
- public void writeObject(Object value) throws JMSException
+
+ public void writeObject(Object value) throws JMSException
{
if (msgReadOnly) throw new MessageNotWriteableException("the message
body is read-only");
- try {
+ try {
if (value instanceof String) p.writeChars((String)value);
else if (value instanceof Boolean)
p.writeBoolean(((Boolean)value).booleanValue());
else if (value instanceof Byte)
p.writeByte(((Byte)value).byteValue());
@@ -324,16 +324,16 @@
else if (value instanceof Float)
p.writeFloat(((Float)value).floatValue());
else if (value instanceof Double)
p.writeDouble(((Double)value).doubleValue());
else if (value instanceof byte[])
p.write((byte[])value,0,((byte[])value).length);
- else throw new MessageFormatException("Invalid object for
properties");
+ else throw new MessageFormatException("Invalid object for
properties");
} catch (IOException e) {
throw new JMSException("IOException");
- }
-
- }
+ }
- public void reset() throws JMSException
+ }
+
+ public void reset() throws JMSException
{
- try {
+ try {
if (!msgReadOnly) {
p.flush();
InternalArray=ostream.toByteArray();
@@ -346,50 +346,60 @@
msgReadOnly = true;
} catch (IOException e) {
throw new JMSException("IOException");
- }
+ }
}
-
+
public void clearBody() throws JMSException
{
- try {
+ try {
if (!msgReadOnly) ostream.close();
else istream.close();
} catch (IOException e) {
//don't throw an exception
- }
-
+ }
+
ostream=new ByteArrayOutputStream();
p=new DataOutputStream(ostream);
- InternalArray=null;
+ InternalArray=null;
istream=null;
m=null;
-
+
super.clearBody();
}
- // Package protected ---------------------------------------------
-
+ // Package protected ---------------------------------------------
+
//We need to reset() since this message is going to be cloned/serialized
public SpyMessage myClone()
{
try {
- reset();
+ reset();
return (SpyMessage)clone();
- } catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException("myClone failed !");
- }
+ }
}
-
+
// Private -------------------------------------------------------
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ if (!msgReadOnly) {
+ p.flush();
+ InternalArray=ostream.toByteArray();
+ }
+ out.writeObject(InternalArray);
+ }
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ InternalArray = (byte[])in.readObject();
+ }
- private void checkRead() throws JMSException
+ private void checkRead() throws JMSException
{
if (!msgReadOnly) throw new MessageNotWriteableException("readByte
while the buffer is writeonly");
-
+
//We have just received/reset() the message, and the client is trying
to read it
if (istream==null||m==null) {
istream = new ByteArrayInputStream(InternalArray);
- m = new DataInputStream(istream);
+ m = new DataInputStream(istream);
}
}
1.13.2.1 +3 -3 jbossmq/src/main/org/jbossmq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnection.java,v
retrieving revision 1.13
retrieving revision 1.13.2.1
diff -u -r1.13 -r1.13.2.1
--- SpyConnection.java 2001/07/28 00:30:15 1.13
+++ SpyConnection.java 2001/08/01 05:19:47 1.13.2.1
@@ -47,7 +47,7 @@
{
if (closed) throw new IllegalStateException("The connection is
closed");
- TopicSession session=new
SpyTopicSession(this,transacted,acknowledgeMode,modeStop);
+ TopicSession session=new
SpyTopicSession(this,transacted,acknowledgeMode);
//add the new session to the createdSessions list
synchronized (createdSessions) {
@@ -146,7 +146,7 @@
{
if (closed) throw new IllegalStateException("The connection is
closed");
- QueueSession session=new
SpyQueueSession(this,transacted,acknowledgeMode,modeStop);
+ QueueSession session=new
SpyQueueSession(this,transacted,acknowledgeMode);
//add the new session to the createdSessions list
synchronized (createdSessions) {
1.8.2.1 +3 -1 jbossmq/src/main/org/jbossmq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessage.java,v
retrieving revision 1.8
retrieving revision 1.8.2.1
diff -u -r1.8 -r1.8.2.1
--- SpyMessage.java 2001/07/28 00:30:15 1.8
+++ SpyMessage.java 2001/08/01 05:19:47 1.8.2.1
@@ -67,6 +67,8 @@
public transient boolean shouldAck;
//For ordering in the JMSServerQueue (set on the server side)
public transient long messageId;
+ //For some persistence mechanisms
+ public transient Object persistData;
// Constructor ---------------------------------------------------
1.8.2.1 +44 -51 jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java,v
retrieving revision 1.8
retrieving revision 1.8.2.1
diff -u -r1.8 -r1.8.2.1
--- SpyMessageConsumer.java 2001/07/28 00:30:15 1.8
+++ SpyMessageConsumer.java 2001/08/01 05:19:47 1.8.2.1
@@ -82,13 +82,22 @@
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
+
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- subscription.receiving = true;
+ if ( subscription.actsLikeAQueue ) {
+ while( true ) {
+ SpyMessage msg =
session.connection.receive(subscription, 0);
+ if( msg == null )
+ break;
+ Message mes = preProcessMessage( msg );
+ if( mes != null )
+ return mes;
+ }
+ }
- if ( subscription.actsLikeAQueue )
- session.connection.receive(subscription, 0);
+ subscription.receiving = true;
synchronized (messages) {
@@ -96,12 +105,9 @@
while (true) {
if (closed)
return null;
- if (!session.modeStop) {
- Message mes = getMessage();
- if (mes != null)
- return mes;
- } else
- cat.debug("the connection is stopped
!");
+ Message mes = getMessage();
+ if (mes != null)
+ return mes;
cat.debug("SpyMessageConsumer: receive in
messages.wait()");
messages.wait();
@@ -118,20 +124,29 @@
}
public Message receive(long timeOut) throws JMSException {
+ if (timeOut == 0)
+ return receive();
+
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
+
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- if (timeOut == 0)
- return receive();
-
long endTime = System.currentTimeMillis() + timeOut;
- subscription.receiving = true;
+ if ( subscription.actsLikeAQueue ) {
+ while( true ) {
+ SpyMessage msg =
session.connection.receive(subscription, timeOut);
+ if( msg == null )
+ break;
+ Message mes = preProcessMessage( msg );
+ if( mes != null )
+ return mes;
+ }
+ }
- if ( subscription.actsLikeAQueue )
- session.connection.receive(subscription, timeOut);
+ subscription.receiving = true;
synchronized (messages) {
@@ -142,14 +157,10 @@
if (closed)
return null;
- if (!session.modeStop) {
- Message mes = getMessage();
- if (mes != null) {
- return mes;
- }
-
- } else
- cat.debug("the connection is stopped
!");
+ Message mes = getMessage();
+ if (mes != null) {
+ return mes;
+ }
long att = endTime -
System.currentTimeMillis();
if (att <= 0) {
@@ -173,31 +184,18 @@
public Message receiveNoWait() throws JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
+
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
-
- subscription.receiving = true;
- try {
-
- if ( subscription.actsLikeAQueue ) {
- if (session.modeStop)
- return null;
-
- SpyMessage msg =
session.connection.receive(getSubscription(), -1);
- return preProcessMessage( msg );
- }
-
- synchronized (messages) {
- while (true) {
- if (session.modeStop)
- return null;
- return getMessage();
- }
- }
- } finally {
- subscription.receiving = false;
+ if ( subscription.actsLikeAQueue ) {
+ SpyMessage msg = session.connection.receive(subscription, -1);
+ if( msg == null )
+ return null;
+ return preProcessMessage( msg );
}
+
+ return getMessage();
}
public void close() throws JMSException {
@@ -341,13 +339,8 @@
return null;
SpyMessage mes = (SpyMessage)
messages.removeFirst();
-
- //the SAME Message object is put in different
SessionQueues
- //when we deliver it, we have to clone() it to
insure independance
- //HRC: could we avoid this if we know that we
are delivering in P2P???
- SpyMessage message = mes.myClone();
- Message rc = preProcessMessage( message );
+ Message rc = preProcessMessage( mes);
// could happen if the message has expired.
if( rc == null )
continue;
1.3.4.1 +2 -6 jbossmq/src/main/org/jbossmq/SpyQueueSender.java
Index: SpyQueueSender.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueSender.java,v
retrieving revision 1.3
retrieving revision 1.3.4.1
diff -u -r1.3 -r1.3.4.1
--- SpyQueueSender.java 2001/05/20 23:38:18 1.3
+++ SpyQueueSender.java 2001/08/01 05:19:47 1.3.4.1
@@ -95,11 +95,7 @@
message = m;
}
- // Clone the message so we can make the outbound message read only
- SpyMessage clone = ((SpyMessage)message).myClone();
- clone.setReadOnlyMode();
-
//Send the message.
- session.sendMessage(clone);
+ session.sendMessage((SpyMessage)message);
}
}
1.4.2.1 +5 -5 jbossmq/src/main/org/jbossmq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueSession.java,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- SpyQueueSession.java 2001/07/16 02:51:44 1.4
+++ SpyQueueSession.java 2001/08/01 05:19:47 1.4.2.1
@@ -102,15 +102,15 @@
}
- SpyQueueSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
+ SpyQueueSession(Connection myConnection, boolean transacted, int
acknowledgeMode)
{
- this(myConnection,transacted,acknowledgeMode,stop, false);
+ this(myConnection,transacted,acknowledgeMode,false);
}
// Constructor ---------------------------------------------------
- SpyQueueSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean stop, boolean xaSession)
+ SpyQueueSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean xaSession)
{
- super(myConnection,transacted,acknowledgeMode,stop, xaSession);
+ super(myConnection,transacted,acknowledgeMode,xaSession);
}
}
1.6.2.1 +4 -17 jbossmq/src/main/org/jbossmq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpySession.java,v
retrieving revision 1.6
retrieving revision 1.6.2.1
diff -u -r1.6 -r1.6.2.1
--- SpySession.java 2001/07/16 02:51:44 1.6
+++ SpySession.java 2001/08/01 05:19:47 1.6.2.1
@@ -54,8 +54,6 @@
//MessageConsumers created by this session
protected HashSet consumers;
- //Is my connection in stopped mode ?
- protected boolean modeStop;
//Is the session closed ?
boolean closed;
@@ -181,7 +179,7 @@
//if we are not in stopped mode, look at the
incoming queue
//Consisder if should be stopped because we
are outside the XA transaction (start/end)
boolean xaStop = spyXAResource!=null &&
currentTransactionId==null;
- if (!(modeStop || xaStop)) {
+ if (!xaStop) {
Iterator i;
synchronized (consumers) {
@@ -338,7 +336,7 @@
throw new IllegalStateException("The session is closed");
if( transacted ) {
-
connection.spyXAResourceManager.addMessage(currentTransactionId, m);
+
connection.spyXAResourceManager.addMessage(currentTransactionId, m.myClone());
} else {
connection.sendToServer(m);
}
@@ -350,16 +348,6 @@
return spyXAResource;
}
- //The connection has changed its mode (stop() or start())
- //We have to wait until message delivery has stopped or wake up the thread
- void setStopMode(boolean newValue)
- {
-
- if (closed) throw new IllegalStateException("The session is closed");
- if (modeStop==newValue) return;
- modeStop=newValue;
- }
-
void addConsumer(SpyMessageConsumer who) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -401,13 +389,12 @@
// Constructor ---------------------------------------------------
- SpySession(Connection conn, boolean trans, int acknowledge, boolean stop,
boolean xaSession)
+ SpySession(Connection conn, boolean trans, int acknowledge, boolean xaSession)
{
connection=conn;
transacted=trans;
acknowledgeMode=acknowledge;
- modeStop=stop;
if( xaSession )
spyXAResource = new SpyXAResource(this);
1.5.2.1 +5 -5 jbossmq/src/main/org/jbossmq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSession.java,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- SpyTopicSession.java 2001/07/28 00:30:15 1.5
+++ SpyTopicSession.java 2001/08/01 05:19:47 1.5.2.1
@@ -121,15 +121,15 @@
}
- SpyTopicSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
+ SpyTopicSession(Connection myConnection, boolean transacted, int
acknowledgeMode)
{
- this(myConnection,transacted,acknowledgeMode,stop,false);
+ this(myConnection,transacted,acknowledgeMode,false);
}
// Constructor ---------------------------------------------------
- SpyTopicSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean stop, boolean xaSession)
+ SpyTopicSession(Connection myConnection, boolean transacted, int
acknowledgeMode, boolean xaSession)
{
- super(myConnection,transacted,acknowledgeMode,stop,xaSession);
+ super(myConnection,transacted,acknowledgeMode,xaSession);
}
}
1.2.2.1 +3 -3 jbossmq/src/main/org/jbossmq/SpyXAConnection.java
Index: SpyXAConnection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyXAConnection.java,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- SpyXAConnection.java 2001/07/16 02:51:44 1.2
+++ SpyXAConnection.java 2001/08/01 05:19:47 1.2.2.1
@@ -51,7 +51,7 @@
if (closed) throw new IllegalStateException("The connection is
closed");
- XAQueueSession session=new SpyQueueSession(this,true,0,modeStop,true);
+ XAQueueSession session=new SpyQueueSession(this,true,0,true);
//add the new session to the createdSessions list
synchronized (createdSessions) {
@@ -85,7 +85,7 @@
if (closed)
throw new IllegalStateException("The connection is closed");
- XATopicSession session = new SpyTopicSession(this, true, 0, modeStop,
true);
+ XATopicSession session = new SpyTopicSession(this, true, 0, true);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
1.5.2.1 +32 -20 jbossmq/src/main/org/jbossmq/Subscription.java
Index: Subscription.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Subscription.java,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- Subscription.java 2001/07/28 00:30:15 1.5
+++ Subscription.java 2001/08/01 05:19:47 1.5.2.1
@@ -15,9 +15,9 @@
/**
* This class contians all the data needed to for a the provider to
* to determine if a message can be routed to a consumer.
- *
+ *
* @author Hiram Chirino ([EMAIL PROTECTED])
- *
+ *
* @version $Revision$
*/
public class Subscription
@@ -31,18 +31,18 @@
public String messageSelector;
// Should this message destroy the subscription?
public boolean destroyDurableSubscription;
-
-
// Topics might not want locally produced messages
public boolean noLocal;
+ // Does this subscription look like a queue?
+ public boolean actsLikeAQueue;
- // Transient Values
+ // Transient Values
public transient Selector selector;
public transient ConnectionToken dc;
public transient boolean listening;
public transient boolean receiving;
- // Determines the consumer would accept the message.
+ // Determines the consumer would accept the message.
public boolean accepts( SpyMessage message, boolean exclusive ) throws
javax.jms.JMSException {
Selector ms = getSelector();
@@ -59,39 +59,51 @@
// But if the subscriber is durable, then it acts like a Queue
if( actsLikeAQueue ) {
-
+
if( !exclusive )
return false;
if( !listening && !receiving )
return false;
-
+
}
-
+
} else {
-
+
if( !exclusive )
- return false;
+ return false;
// In the Queue case we only deliver if it is currently
// has a listner or is receiving
if( !listening && !receiving )
return false;
}
-
+
return true;
-
- }
- public boolean actsLikeAQueue;
+ }
- // Determines the consumer would accept the message.
+ // Determines the consumer would accept the message.
public Selector getSelector() throws javax.jms.JMSException {
if( messageSelector == null )
return null;
-
- if( selector==null )
+
+ if( selector==null )
selector = new Selector(messageSelector);
+
+ return selector;
+ }
+
+ public Subscription myClone(){
+ Subscription result = new Subscription();
+
+ //only need to clone non-transient fields for our purposes.
+ result.subscriptionId = subscriptionId;
+ result.destination = destination;
+ result.messageSelector = messageSelector;
+ result.destroyDurableSubscription = destroyDurableSubscription;
+ result.noLocal = noLocal;
+ result.actsLikeAQueue = actsLikeAQueue;
- return selector;
+ return result;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development