User: hiram   
  Date: 00/12/31 15:46:33

  Modified:    src/java/org/spydermq SpyEncapsulatedMessage.java
                        SpyMessage.java SpyMessageConsumer.java
                        SpyQueueSender.java SpyTopicPublisher.java
  Log:
  Implemented Non-optimized message delivery
  
  Revision  Changes    Path
  1.2       +35 -7     spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java
  
  Index: SpyEncapsulatedMessage.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyEncapsulatedMessage.java       2000/05/31 18:06:43     1.1
  +++ SpyEncapsulatedMessage.java       2000/12/31 23:46:32     1.2
  @@ -12,18 +12,46 @@
    *   This Message class is used to send a non 'provider-optimized Message' over the 
network [4.4.5]
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyEncapsulatedMessage
  -     extends SpyMessage
  -{
  +     extends SpyObjectMessage
  +{    
  +
        
  -     private Message mes;
  -     
  -     SpyEncapsulatedMessage(Message m)
  +     SpyEncapsulatedMessage() throws javax.jms.JMSException
        {
  -             //mes=m.clone();                
        }
  -     
  +
  +     public Message getMessage() throws javax.jms.JMSException
  +     {
  +             Message m = (Message)this.getObject();
  +             m.setJMSRedelivered( getJMSRedelivered() );
  +             return m;
  +     }
  +
  +     public void setMessage(Message m) throws javax.jms.JMSException
  +     {
  +             this.setObject((java.io.Serializable)m);
  +             
  +             setJMSCorrelationID(m.getJMSCorrelationID());
  +             setJMSCorrelationIDAsBytes(m.getJMSCorrelationIDAsBytes());
  +             setJMSReplyTo(m.getJMSReplyTo());
  +             setJMSType(m.getJMSType());             
  +             setJMSDestination( m.getJMSDestination() );
  +             setJMSDeliveryMode( m.getJMSDeliveryMode() );
  +             setJMSExpiration( m.getJMSExpiration() );
  +             setJMSPriority( m.getJMSPriority() );
  +             setJMSMessageID( m.getJMSMessageID() );
  +             setJMSTimestamp( m.getJMSTimestamp() );
  +
  +             java.util.Enumeration enum = m.getPropertyNames();
  +             while( enum.hasMoreElements() ) {
  +                     String name = (String)enum.nextElement();
  +                     Object o = m.getObjectProperty(name);
  +                     setObjectProperty(name, o);
  +             }
  +     }
   }
  
  
  
  1.12      +15 -17    spyderMQ/src/java/org/spydermq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyMessage.java   2000/12/26 04:15:50     1.11
  +++ SpyMessage.java   2000/12/31 23:46:32     1.12
  @@ -22,7 +22,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyMessage 
        implements Serializable, Cloneable, Message, Comparable
  @@ -37,22 +37,22 @@
        //Those attributes are not transient ---------------
        //Header fields 
        //Set by send() method
  -     public Destination jmsDestination=null;
  -     public int jmsDeliveryMode=-1;
  -     public long jmsExpiration=0;
  -     public int jmsPriority=-1;
  -     public String jmsMessageID=null;
  -     public long jmsTimeStamp=0;
  +     private Destination jmsDestination=null;
  +     private int jmsDeliveryMode=-1;
  +     private long jmsExpiration=0;
  +     private int jmsPriority=-1;
  +     private String jmsMessageID=null;
  +     private long jmsTimeStamp=0;
        //Set by the client
  -     public boolean jmsCorrelationID=true;
  -     public String jmsCorrelationIDString=null; 
  -     public byte[] jmsCorrelationIDbyte=null;
  -     public Destination jmsReplyTo=null;
  -     public String jmsType=null;
  +     private boolean jmsCorrelationID=true;
  +     private String jmsCorrelationIDString=null; 
  +     private byte[] jmsCorrelationIDbyte=null;
  +     private Destination jmsReplyTo=null;
  +     private String jmsType=null;
        //Set by the provider
  -     public boolean jmsRedelivered=false;
  +     private boolean jmsRedelivered=false;
        //Properties
  -     public Hashtable prop;
  +     private Hashtable prop;
        public boolean propReadWrite;
        //Message body
        public boolean msgReadOnly=false;
  @@ -409,8 +409,6 @@
                
        }
   
  -
  -
        void setReadOnlyMode()
        {
                propReadWrite=false;
  @@ -450,7 +448,7 @@
                        return 1;
                }
                return (int)(messageId - sm.messageId);         
  -     }       
  +     }         
   
        
        public void doAcknowledge() throws JMSException
  
  
  
  1.13      +62 -45    spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SpyMessageConsumer.java   2000/12/27 17:02:26     1.12
  +++ SpyMessageConsumer.java   2000/12/31 23:46:32     1.13
  @@ -24,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
  @@ -216,46 +216,6 @@
        }
   
   
  -
  -     SpyMessage getMessage() {
  -             synchronized (messages) {
  -
  -                     while (true) {
  -
  -                             try {
  -                                     if (messages.size() == 0)
  -                                             return null;
  -
  -                                     SpyMessage mes = (SpyMessage) 
messages.removeFirst();
  -
  -                                     if (mes.isOutdated()) {
  -                                             Log.notice("SessionQueue: I dropped a 
message (timeout)");
  -                                             continue;
  -                                     }
  -
  -                                     //the SAME Message object is put in different 
SessionQueues
  -                                     //when we deliver it, we have to clone() it to 
insure independance
  -                                     SpyMessage message = mes.myClone();
  -                                     message.session = session;
  -
  -                                     if (message.shouldAck && session.transacted) {
  -                                             
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, 
message);
  -                                     } else if (message.shouldAck && 
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == 
session.DUPS_OK_ACKNOWLEDGE) {
  -                                             message.doAcknowledge();
  -                                     }
  -
  -                                     return message;
  -
  -                             } catch (Exception e) {
  -                                     e.printStackTrace();
  -                             }
  -
  -                     }
  -
  -             }
  -
  -     }
  -
        public void addMessage(SpyMessage mes) throws JMSException {
                synchronized (messages) {
                        //Add a message to the queue
  @@ -281,7 +241,7 @@
                        if( messageListener==null && !subscription.receiving && 
                                        (this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null) ) {
   
  -                             SpyMessage mes = getMessage();
  +                             Message mes = getMessage();
                                while (mes != null) {
   
                                        Log.log("Got unrequested message, sending NACK 
for: " + mes);
  @@ -298,7 +258,7 @@
                                return false;
                        } else if (closed) { // If closed, dump the message 
   
  -                             SpyMessage mes = getMessage();
  +                             Message mes = getMessage();
                                while (mes != null) {
                                        mes = getMessage();
                                }
  @@ -309,11 +269,17 @@
                        if (messageListener == null) {
                                messages.notify();
                        } else {
  -                             SpyMessage mes = getMessage();
  +                             SpyMessage mes = (SpyMessage)getMessage();
                                if (mes == null)
                                        return false;
   
  -                             messageListener.onMessage(mes);
  +                             Message message = mes;
  +                             if( mes instanceof SpyEncapsulatedMessage ) {
  +                                     message = 
((SpyEncapsulatedMessage)mes).getMessage();
  +                             }
  +                             
  +                             messageListener.onMessage(message);
  +                             
                                if (session.transacted) {
                                        
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
                                } else if (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
  @@ -345,5 +311,56 @@
   
        public String toString() {
                return "SpyMessageConsumer:"+subscription.destination;
  +     }
  +
  +     Message getMessage() {
  +             synchronized (messages) {
  +
  +                     while (true) {
  +
  +                             try {
  +                                     if (messages.size() == 0)
  +                                             return null;
  +
  +                                     SpyMessage mes = (SpyMessage) 
messages.removeFirst();
  +
  +                                     //the SAME Message object is put in different 
SessionQueues
  +                                     //when we deliver it, we have to clone() it to 
insure independance
  +                                     SpyMessage message = mes.myClone();
  +                                     message.session = session;
  +
  +                                     // Has the message expired?
  +                                     if (mes.isOutdated()) {
  +                                             Log.notice("SessionQueue: I dropped a 
message (timeout)");
  +                                             if ( message.shouldAck ) 
  +                                                     message.doAcknowledge();
  +                                             continue;
  +                                     }
  +
  +                                     // Should we try to ack before the message is 
processed?
  +                                     if (messageListener == null) {
  +
  +                                             if (message.shouldAck && 
session.transacted) {
  +                                                     
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, 
message);
  +                                             } else if (message.shouldAck && 
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == 
session.DUPS_OK_ACKNOWLEDGE) {
  +                                                     message.doAcknowledge();
  +                                             }
  +                                             
  +                                             if( message instanceof 
SpyEncapsulatedMessage ) 
  +                                                     return 
((SpyEncapsulatedMessage)message).getMessage();
  +                                             return message;
  +                                             
  +                                     } else {
  +                                             return message;
  +                                     }
  +
  +                             } catch (Exception e) {
  +                                     e.printStackTrace();
  +                             }
  +
  +                     }
  +
  +             }
  +
        }
   }
  
  
  
  1.5       +15 -10    spyderMQ/src/java/org/spydermq/SpyQueueSender.java
  
  Index: SpyQueueSender.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSender.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyQueueSender.java       2000/12/12 21:04:22     1.4
  +++ SpyQueueSender.java       2000/12/31 23:46:32     1.5
  @@ -16,8 +16,9 @@
    *   This class implements javax.jms.QueueSender
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyQueueSender 
        extends SpyMessageProducer 
  @@ -64,14 +65,10 @@
                send(queue,message,deliveryMode,priority,timeToLive);
        }
   
  -     public void send(Queue queue, Message mes, int deliveryMode, int priority, 
long timeToLive) throws JMSException
  -     {
  -             //We only accept our classes (for now)
  -             if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot 
deliver this message");
  -             SpyMessage message=(SpyMessage)mes;
  -             
  +     public void send(Queue queue, Message message, int deliveryMode, int priority, 
long timeToLive) throws JMSException
  +     {               
                //Set the header fields
  -             message.jmsDestination=queue;
  +             message.setJMSDestination(queue);
                message.setJMSDeliveryMode(deliveryMode);
                long ts=System.currentTimeMillis();
                message.setJMSTimestamp(ts);
  @@ -83,9 +80,17 @@
                message.setJMSPriority(priority);
                message.setJMSMessageID(session.getNewMessageID());
                
  -             //Clone the message so we can make the outbound message read only
  -             SpyMessage clone = message.myClone();
  +             // Encapsulate the message if not a SpyMessage
  +             if (!(message instanceof SpyMessage)) {
  +                     SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
  +                     m.setMessage(message);
  +                     message = m;
  +             }
  +             
  +             // Clone the message so we can make the outbound message read only
  +             SpyMessage clone = ((SpyMessage)message).myClone();
                clone.setReadOnlyMode();
  +             
                //Send the message.
                session.sendMessage(clone);
        }
  
  
  
  1.5       +13 -9     spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java
  
  Index: SpyTopicPublisher.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyTopicPublisher.java    2000/12/12 21:04:22     1.4
  +++ SpyTopicPublisher.java    2000/12/31 23:46:32     1.5
  @@ -17,8 +17,9 @@
    *   This class implements javax.jms.TopicPublisher
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyTopicPublisher 
        extends SpyMessageProducer 
  @@ -65,14 +66,10 @@
                publish(myTopic,message,deliveryMode,priority,timeToLive);
        }
   
  -     public void publish(Topic topic, Message mes, int deliveryMode, int priority, 
long timeToLive) throws JMSException
  +     public void publish(Topic topic, Message message, int deliveryMode, int 
priority, long timeToLive) throws JMSException
        {
  -             //We only accept our classes (for now)
  -             if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot 
deliver this message");
  -             SpyMessage message=(SpyMessage)mes;
  -             
                //Set the header fields
  -             message.jmsDestination=topic;
  +             message.setJMSDestination(topic);
                message.setJMSDeliveryMode(deliveryMode);
                long ts=System.currentTimeMillis();
                message.setJMSTimestamp(ts);
  @@ -83,9 +80,16 @@
                }
                message.setJMSPriority(priority);
                message.setJMSMessageID(mySession.getNewMessageID());
  +
  +             // Encapsulate the message if not a SpyMessage
  +             if (!(message instanceof SpyMessage)) {
  +                     SpyEncapsulatedMessage m = new SpyEncapsulatedMessage();
  +                     m.setMessage(message);
  +                     message = m;
  +             }
                
  -             //Clone the message so we can make the outbound message read only      
 
  -             SpyMessage clone = message.myClone();
  +             //Clone the message so we can make the outbound message read only
  +             SpyMessage clone = ((SpyMessage)message).myClone();
                clone.setReadOnlyMode();
                //Send the message
                mySession.sendMessage(clone);
  
  
  

Reply via email to