User: hiram   
  Date: 01/02/21 22:12:28

  Modified:    src/main/org/jbossmq/cluster Cluster.java
                        ClusterTesterReceiver.java ClusterTesterSender.java
                        InvalidConfigurationException.java
                        InvalidStateException.java SerializerUtil.java
                        TopicListener.java Transport.java
                        TransportListener.java
  Added:       src/main/org/jbossmq/cluster NodeId.java
  Log:
  Improved easy of use by removing the requirment of having to explicitly set the 
nodeId of a cluster.
  We now uses normal UDP packets for resend requests (In other words resend requests 
are not broadcasted).
  Message transmition rates are slowed down when a node starts receiving resend 
requests.
  
  Revision  Changes    Path
  1.2       +23 -33    jbossmq/src/main/org/jbossmq/cluster/Cluster.java
  
  Index: Cluster.java
  ===================================================================
  RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Cluster.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Cluster.java      2001/02/15 03:33:04     1.1
  +++ Cluster.java      2001/02/22 06:12:26     1.2
  @@ -36,7 +36,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class Cluster {
   
  @@ -48,10 +48,8 @@
        // The transport layer that the cluster will be using
        private Transport transport;
   
  -     // The node id of the local process.  All process in a cluster MUST
  -     // have unique nodeIds
  -     public short nodeId;
   
  +
        // The managment topic is reserved for the transport layer's use.
        public static final short MANAGMENT_TOPIC = 0;
   
  @@ -60,7 +58,7 @@
        // easy to use.
        private MyTransportListener transportListener = new MyTransportListener();
        private class MyTransportListener implements TransportListener {
  -             public void messageArrivedEvent(short topic, short senderId, byte 
message[])
  +             public void messageArrivedEvent(short topic, NodeId senderId, byte 
message[])
                        throws InterruptedException {
   
                        Short id = new Short(topic);
  @@ -116,19 +114,9 @@
                transport.setCluster(this);
        }
   
  -     /**
  -      * Every node in the cluster has to have a unique id.
  -      */
  -     public void setNodeId(short newNodeId) {
  -             nodeId = newNodeId;
  -     }
   
  -     /**
  -      * The id of this node of the cluster.
  -      */
  -     public short getNodeId() {
  -             return nodeId;
  -     }
  +
  +
   
        /**
         * The transport being used by the cluster.
  @@ -249,23 +237,7 @@
   
        }
   
  -     /**
  -      * Used internally to send a message to all the local listeners.
  -      */
  -     private void messageArrivedEvent(short topic, short sender, Object message)
  -             throws InterruptedException {
  -             LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
  -             if (ll == null) {
  -                     // No listeners...
  -                     return;
  -             }
   
  -             Iterator i = ll.iterator();
  -             while (i.hasNext()) {
  -                     TopicListener c = (TopicListener) i.next();
  -                     c.onMessage(topic, sender, message);
  -             }
  -     }
   
        /**
         * Send a message to the cluster on the given channel.
  @@ -281,7 +253,7 @@
                throws IOException, InterruptedException {
   
                if (localBroadcast)
  -                     messageArrivedEvent(topic, getNodeId(), message);
  +                     messageArrivedEvent(topic, null, message);
   
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream os = new ObjectOutputStream(bos);
  @@ -296,5 +268,23 @@
                byte data[] = bos.toByteArray();
                transport.send(topic, data, droppable, keepOrder);
   
  +     }
  +
  +     /**
  +      * Used internally to send a message to all the local listeners.
  +      */
  +     private void messageArrivedEvent(short topic, NodeId sender, Object message)
  +             throws InterruptedException {
  +             LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
  +             if (ll == null) {
  +                     // No listeners...
  +                     return;
  +             }
  +
  +             Iterator i = ll.iterator();
  +             while (i.hasNext()) {
  +                     TopicListener c = (TopicListener) i.next();
  +                     c.onMessage(topic, sender, message);
  +             }
        }
   }
  
  
  
  1.2       +1 -2      jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java
  
  Index: ClusterTesterReceiver.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClusterTesterReceiver.java        2001/02/15 03:33:04     1.1
  +++ ClusterTesterReceiver.java        2001/02/22 06:12:26     1.2
  @@ -12,7 +12,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class ClusterTesterReceiver {
   
  @@ -28,14 +28,13 @@
                is.close();
   
                transport.setProperties(p);
  -             cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
                cluster.setTransport(transport);
   
                short echoChannelId = 13;
   
                final Channel echoChannel = new BoundedLinkedQueue(10);
                TopicListener myListener = new TopicListener() {
  -                     public void onMessage(short topic, short sender, Object m) {
  +                     public void onMessage(short topic, NodeId sender, Object m) {
                                try {
                                        System.out.println("Got :" + m);
                                        //echoChannel.put( m );
  
  
  
  1.2       +0 -1      jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java
  
  Index: ClusterTesterSender.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClusterTesterSender.java  2001/02/15 03:33:04     1.1
  +++ ClusterTesterSender.java  2001/02/22 06:12:26     1.2
  @@ -12,7 +12,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class ClusterTesterSender {
        
  @@ -32,7 +32,6 @@
                is.close();
   
                transport.setProperties(p);
  -             cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
                cluster.setTransport(transport);
   
                transport.start();
  
  
  
  1.2       +0 -0      
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
  
  Index: InvalidConfigurationException.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- InvalidConfigurationException.java        2001/02/15 03:33:04     1.1
  +++ InvalidConfigurationException.java        2001/02/22 06:12:26     1.2
  @@ -11,7 +11,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class InvalidConfigurationException extends Exception {
        
  
  
  
  1.2       +0 -0      jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
  
  Index: InvalidStateException.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- InvalidStateException.java        2001/02/15 03:33:05     1.1
  +++ InvalidStateException.java        2001/02/22 06:12:26     1.2
  @@ -11,7 +11,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class InvalidStateException extends Exception {
        
  
  
  
  1.2       +12 -0     jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
  
  Index: SerializerUtil.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SerializerUtil.java       2001/02/15 03:33:05     1.1
  +++ SerializerUtil.java       2001/02/22 06:12:26     1.2
  @@ -14,7 +14,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class SerializerUtil {
   
  @@ -84,6 +84,18 @@
        
        public static void writeShortTo(short v, byte[] data, int pos) {
                data[pos] = (byte) ((v >>> 8) & 0xFF);
  +             data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
  +     }
  +
  +     public static int readUShortFrom(byte[] data, int offset) {
  +             return (
  +                           ((data[offset + 0] & 0xFF) << 8)
  +                             | ((data[offset + 1] & 0xFF) << 0)
  +                        );
  +     }
  +
  +     static public void writeUShortTo(int v, byte[] data, int pos) {
  +             data[pos + 0] = (byte) ((v >>> 8) & 0xFF);
                data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
        }
   }
  
  
  
  1.2       +2 -1      jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
  
  Index: TopicListener.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TopicListener.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TopicListener.java        2001/02/15 03:33:05     1.1
  +++ TopicListener.java        2001/02/22 06:12:27     1.2
  @@ -14,9 +14,10 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    *
    */
   public interface TopicListener {
  -     public void onMessage( short topic, short sender, Object message );
  +
  +     public void onMessage( short topic, NodeId sender, Object message );
   }
  
  
  
  1.2       +0 -0      jbossmq/src/main/org/jbossmq/cluster/Transport.java
  
  Index: Transport.java
  ===================================================================
  RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Transport.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Transport.java    2001/02/15 03:33:05     1.1
  +++ Transport.java    2001/02/22 06:12:27     1.2
  @@ -13,7 +13,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    *
    */
   public interface Transport {
  
  
  
  1.2       +2 -1      jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
  
  Index: TransportListener.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TransportListener.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TransportListener.java    2001/02/15 03:33:05     1.1
  +++ TransportListener.java    2001/02/22 06:12:27     1.2
  @@ -16,8 +16,9 @@
    * 
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public interface TransportListener {
  -     public void messageArrivedEvent( short channelId, short senderId, byte 
message[] ) throws InterruptedException;
  +
  +     public void messageArrivedEvent( short channelId, NodeId senderId, byte 
message[] ) throws InterruptedException;
   }
  
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/NodeId.java
  
  Index: NodeId.java
  ===================================================================
  package org.jbossmq.cluster;
  
  import java.lang.Object;
  
  /**
   * 
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public interface NodeId {
  }
  
  
  

Reply via email to