User: hiram
Date: 00/12/31 15:46:33
Modified: src/java/org/spydermq SpyEncapsulatedMessage.java
SpyMessage.java SpyMessageConsumer.java
SpyQueueSender.java SpyTopicPublisher.java
Log:
Implemented Non-optimized message delivery
Revision Changes Path
1.2 +35 -7 spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java
Index: SpyEncapsulatedMessage.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyEncapsulatedMessage.java 2000/05/31 18:06:43 1.1
+++ SpyEncapsulatedMessage.java 2000/12/31 23:46:32 1.2
@@ -12,18 +12,46 @@
* This Message class is used to send a non 'provider-optimized Message' over the
network [4.4.5]
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyEncapsulatedMessage
- extends SpyMessage
-{
+ extends SpyObjectMessage
+{
+
- private Message mes;
-
- SpyEncapsulatedMessage(Message m)
+ SpyEncapsulatedMessage() throws javax.jms.JMSException
{
- //mes=m.clone();
}
-
+
+ public Message getMessage() throws javax.jms.JMSException
+ {
+ Message m = (Message)this.getObject();
+ m.setJMSRedelivered( getJMSRedelivered() );
+ return m;
+ }
+
+ public void setMessage(Message m) throws javax.jms.JMSException
+ {
+ this.setObject((java.io.Serializable)m);
+
+ setJMSCorrelationID(m.getJMSCorrelationID());
+ setJMSCorrelationIDAsBytes(m.getJMSCorrelationIDAsBytes());
+ setJMSReplyTo(m.getJMSReplyTo());
+ setJMSType(m.getJMSType());
+ setJMSDestination( m.getJMSDestination() );
+ setJMSDeliveryMode( m.getJMSDeliveryMode() );
+ setJMSExpiration( m.getJMSExpiration() );
+ setJMSPriority( m.getJMSPriority() );
+ setJMSMessageID( m.getJMSMessageID() );
+ setJMSTimestamp( m.getJMSTimestamp() );
+
+ java.util.Enumeration enum = m.getPropertyNames();
+ while( enum.hasMoreElements() ) {
+ String name = (String)enum.nextElement();
+ Object o = m.getObjectProperty(name);
+ setObjectProperty(name, o);
+ }
+ }
}
1.12 +15 -17 spyderMQ/src/java/org/spydermq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- SpyMessage.java 2000/12/26 04:15:50 1.11
+++ SpyMessage.java 2000/12/31 23:46:32 1.12
@@ -22,7 +22,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class SpyMessage
implements Serializable, Cloneable, Message, Comparable
@@ -37,22 +37,22 @@
//Those attributes are not transient ---------------
//Header fields
//Set by send() method
- public Destination jmsDestination=null;
- public int jmsDeliveryMode=-1;
- public long jmsExpiration=0;
- public int jmsPriority=-1;
- public String jmsMessageID=null;
- public long jmsTimeStamp=0;
+ private Destination jmsDestination=null;
+ private int jmsDeliveryMode=-1;
+ private long jmsExpiration=0;
+ private int jmsPriority=-1;
+ private String jmsMessageID=null;
+ private long jmsTimeStamp=0;
//Set by the client
- public boolean jmsCorrelationID=true;
- public String jmsCorrelationIDString=null;
- public byte[] jmsCorrelationIDbyte=null;
- public Destination jmsReplyTo=null;
- public String jmsType=null;
+ private boolean jmsCorrelationID=true;
+ private String jmsCorrelationIDString=null;
+ private byte[] jmsCorrelationIDbyte=null;
+ private Destination jmsReplyTo=null;
+ private String jmsType=null;
//Set by the provider
- public boolean jmsRedelivered=false;
+ private boolean jmsRedelivered=false;
//Properties
- public Hashtable prop;
+ private Hashtable prop;
public boolean propReadWrite;
//Message body
public boolean msgReadOnly=false;
@@ -409,8 +409,6 @@
}
-
-
void setReadOnlyMode()
{
propReadWrite=false;
@@ -450,7 +448,7 @@
return 1;
}
return (int)(messageId - sm.messageId);
- }
+ }
public void doAcknowledge() throws JMSException
1.13 +62 -45 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SpyMessageConsumer.java 2000/12/27 17:02:26 1.12
+++ SpyMessageConsumer.java 2000/12/31 23:46:32 1.13
@@ -24,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
@@ -216,46 +216,6 @@
}
-
- SpyMessage getMessage() {
- synchronized (messages) {
-
- while (true) {
-
- try {
- if (messages.size() == 0)
- return null;
-
- SpyMessage mes = (SpyMessage)
messages.removeFirst();
-
- if (mes.isOutdated()) {
- Log.notice("SessionQueue: I dropped a
message (timeout)");
- continue;
- }
-
- //the SAME Message object is put in different
SessionQueues
- //when we deliver it, we have to clone() it to
insure independance
- SpyMessage message = mes.myClone();
- message.session = session;
-
- if (message.shouldAck && session.transacted) {
-
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId,
message);
- } else if (message.shouldAck &&
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode ==
session.DUPS_OK_ACKNOWLEDGE) {
- message.doAcknowledge();
- }
-
- return message;
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- }
-
- }
-
public void addMessage(SpyMessage mes) throws JMSException {
synchronized (messages) {
//Add a message to the queue
@@ -281,7 +241,7 @@
if( messageListener==null && !subscription.receiving &&
(this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null) ) {
- SpyMessage mes = getMessage();
+ Message mes = getMessage();
while (mes != null) {
Log.log("Got unrequested message, sending NACK
for: " + mes);
@@ -298,7 +258,7 @@
return false;
} else if (closed) { // If closed, dump the message
- SpyMessage mes = getMessage();
+ Message mes = getMessage();
while (mes != null) {
mes = getMessage();
}
@@ -309,11 +269,17 @@
if (messageListener == null) {
messages.notify();
} else {
- SpyMessage mes = getMessage();
+ SpyMessage mes = (SpyMessage)getMessage();
if (mes == null)
return false;
- messageListener.onMessage(mes);
+ Message message = mes;
+ if( mes instanceof SpyEncapsulatedMessage ) {
+ message =
((SpyEncapsulatedMessage)mes).getMessage();
+ }
+
+ messageListener.onMessage(message);
+
if (session.transacted) {
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
} else if (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
@@ -345,5 +311,56 @@
public String toString() {
return "SpyMessageConsumer:"+subscription.destination;
+ }
+
+ Message getMessage() {
+ synchronized (messages) {
+
+ while (true) {
+
+ try {
+ if (messages.size() == 0)
+ return null;
+
+ SpyMessage mes = (SpyMessage)
messages.removeFirst();
+
+ //the SAME Message object is put in different
SessionQueues
+ //when we deliver it, we have to clone() it to
insure independance
+ SpyMessage message = mes.myClone();
+ message.session = session;
+
+ // Has the message expired?
+ if (mes.isOutdated()) {
+ Log.notice("SessionQueue: I dropped a
message (timeout)");
+ if ( message.shouldAck )
+ message.doAcknowledge();
+ continue;
+ }
+
+ // Should we try to ack before the message is
processed?
+ if (messageListener == null) {
+
+ if (message.shouldAck &&
session.transacted) {
+
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId,
message);
+ } else if (message.shouldAck &&
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode ==
session.DUPS_OK_ACKNOWLEDGE) {
+ message.doAcknowledge();
+ }
+
+ if( message instanceof
SpyEncapsulatedMessage )
+ return
((SpyEncapsulatedMessage)message).getMessage();
+ return message;
+
+ } else {
+ return message;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+
}
}
1.5 +15 -10 spyderMQ/src/java/org/spydermq/SpyQueueSender.java
Index: SpyQueueSender.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSender.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueueSender.java 2000/12/12 21:04:22 1.4
+++ SpyQueueSender.java 2000/12/31 23:46:32 1.5
@@ -16,8 +16,9 @@
* This class implements javax.jms.QueueSender
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueueSender
extends SpyMessageProducer
@@ -64,14 +65,10 @@
send(queue,message,deliveryMode,priority,timeToLive);
}
- public void send(Queue queue, Message mes, int deliveryMode, int priority,
long timeToLive) throws JMSException
- {
- //We only accept our classes (for now)
- if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot
deliver this message");
- SpyMessage message=(SpyMessage)mes;
-
+ public void send(Queue queue, Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
+ {
//Set the header fields
- message.jmsDestination=queue;
+ message.setJMSDestination(queue);
message.setJMSDeliveryMode(deliveryMode);
long ts=System.currentTimeMillis();
message.setJMSTimestamp(ts);
@@ -83,9 +80,17 @@
message.setJMSPriority(priority);
message.setJMSMessageID(session.getNewMessageID());
- //Clone the message so we can make the outbound message read only
- SpyMessage clone = message.myClone();
+ // Encapsulate the message if not a SpyMessage
+ if (!(message instanceof SpyMessage)) {
+ SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
+ m.setMessage(message);
+ message = m;
+ }
+
+ // Clone the message so we can make the outbound message read only
+ SpyMessage clone = ((SpyMessage)message).myClone();
clone.setReadOnlyMode();
+
//Send the message.
session.sendMessage(clone);
}
1.5 +13 -9 spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java
Index: SpyTopicPublisher.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyTopicPublisher.java 2000/12/12 21:04:22 1.4
+++ SpyTopicPublisher.java 2000/12/31 23:46:32 1.5
@@ -17,8 +17,9 @@
* This class implements javax.jms.TopicPublisher
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyTopicPublisher
extends SpyMessageProducer
@@ -65,14 +66,10 @@
publish(myTopic,message,deliveryMode,priority,timeToLive);
}
- public void publish(Topic topic, Message mes, int deliveryMode, int priority,
long timeToLive) throws JMSException
+ public void publish(Topic topic, Message message, int deliveryMode, int
priority, long timeToLive) throws JMSException
{
- //We only accept our classes (for now)
- if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot
deliver this message");
- SpyMessage message=(SpyMessage)mes;
-
//Set the header fields
- message.jmsDestination=topic;
+ message.setJMSDestination(topic);
message.setJMSDeliveryMode(deliveryMode);
long ts=System.currentTimeMillis();
message.setJMSTimestamp(ts);
@@ -83,9 +80,16 @@
}
message.setJMSPriority(priority);
message.setJMSMessageID(mySession.getNewMessageID());
+
+ // Encapsulate the message if not a SpyMessage
+ if (!(message instanceof SpyMessage)) {
+ SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
+ m.setMessage(message);
+ message = m;
+ }
- //Clone the message so we can make the outbound message read only
- SpyMessage clone = message.myClone();
+ //Clone the message so we can make the outbound message read only
+ SpyMessage clone = ((SpyMessage)message).myClone();
clone.setReadOnlyMode();
//Send the message
mySession.sendMessage(clone);