User: hiram   
  Date: 01/02/14 19:33:06

  Added:       src/main/org/jbossmq/cluster Cluster.java
                        ClusterTesterReceiver.java ClusterTesterSender.java
                        InvalidConfigurationException.java
                        InvalidStateException.java SerializerUtil.java
                        TopicListener.java Transport.java
                        TransportListener.java
  Log:
  Implemented a Multicast based pub/sub subsystem targeted for building clustered 
applications.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/cluster/Cluster.java
  
  Index: Cluster.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.Iterator;
  
  import java.io.ObjectOutputStream;
  import java.io.ByteArrayInputStream;
  import java.io.Externalizable;
  import java.io.ObjectInputStream;
  import java.io.IOException;
  import java.io.ByteArrayOutputStream;
  
  /**
   * This class provides the interface to send messages to a cluster.
   * The goal was to implement a highly efficient messaging backbone for
   * inter node communications of a cluster of JVMs.
   *
   * The messages can be either Serilizable objects or CustomSerializer
   * Objects.  The CustomSerializer objects can be used if a highly efficient
   * message format is required.
   *
   * This class behaves similary to a TopicSession in JMS.  It will broadcast
   * messages to all listeners of a topic in the cluster.
   * To make it efficient, topics are identified by a short!
   *
   * TopicListeners should not process the inbound messages in the onMessage(...)
   * method.  They should queue the message so that it is processed async.
   * This will allow the cluster to dispatch messages to all the listners quicker.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class Cluster {
  
        // This is maps topics -> LinkedList of listeners
        private HashMap topicListeners = new HashMap();
        // This is maps topics -> Class used for the CustomSerializers
        private HashMap customExternalizers = new HashMap();
  
        // The transport layer that the cluster will be using
        private Transport transport;
  
        // The node id of the local process.  All process in a cluster MUST
        // have unique nodeIds
        public short nodeId;
  
        // The managment topic is reserved for the transport layer's use.
        public static final short MANAGMENT_TOPIC = 0;
  
        // This is the TransportListener the cluster will used.
        // Implemented as an inner class to keep the clusters public interface
        // easy to use.
        private MyTransportListener transportListener = new MyTransportListener();
        private class MyTransportListener implements TransportListener {
                public void messageArrivedEvent(short topic, short senderId, byte 
message[])
                        throws InterruptedException {
  
                        Short id = new Short(topic);
                        LinkedList ll = (LinkedList) topicListeners.get(id);
                        if (ll == null) {
                                // No listeners...
                                return;
                        }
  
                        try {
  
                                Object o;
                                ByteArrayInputStream bais = new 
ByteArrayInputStream(message);
                                ObjectInputStream is = new ObjectInputStream(bais);
  
                                Class serializerClassName = (Class) 
customExternalizers.get(id);
                                if (serializerClassName != null) {
                                        Externalizable s = (Externalizable) 
serializerClassName.newInstance();
                                        s.readExternal(is);
                                        o = s;
                                } else {
                                        o = is.readObject();
                                }
  
                                Iterator i = ll.iterator();
                                while (i.hasNext()) {
                                        TopicListener c = (TopicListener) i.next();
                                        c.onMessage(topic, senderId, o);
                                }
  
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
  
                }
        }
  
        /**
         * Dumb constructor.
         * You must initialize the cluster before using it.
         * (Sending messages, starting the transport layer)
         */
        public Cluster() {
                super();
        }
  
        /**
         * Allows us to change the transport protocol used for the cluster.
         */
        public void setTransport(Transport newTransport) {
                transport = newTransport;
                transport.setTransportListener(transportListener);
                transport.setCluster(this);
        }
  
        /**
         * Every node in the cluster has to have a unique id.
         */
        public void setNodeId(short newNodeId) {
                nodeId = newNodeId;
        }
  
        /**
         * The id of this node of the cluster.
         */
        public short getNodeId() {
                return nodeId;
        }
  
        /**
         * The transport being used by the cluster.
         * @return Transport
         */
        public Transport getTransport() {
                return transport;
        }
  
        /**
         * Send a message to the cluster on the given channel.
         * The message will also be sent to the local node.
         * The message will be marked as not droppable.
         */
        public void send(short topic, Object message)
                throws IOException, InterruptedException {
                // Send w/ local broacast && non droppable
                send(topic, message, true, false, true);
        }
  
        /**
         * Registers a TopicListener to recive messages on a topic
         */
        public void addTopicListener(short topic, TopicListener c) {
                Short id = new Short(topic);
  
                // Write Lock
                synchronized (topicListeners) {
  
                        // We do not modify the current topicListeners list
                        // so that iterators on the topicListeners do not fail.
                        LinkedList ll = (LinkedList) topicListeners.get(id);
                        if (ll == null) {
                                ll = new LinkedList();
                        } else {
                                ll = (LinkedList) ll.clone();
                        }
                        ll.add(c);
  
                        HashMap t = (HashMap) topicListeners.clone();
                        t.put(id, ll);
                        topicListeners = t;
                }
  
        }
  
        /**
         * UnRegisters a TopicListener from a topic.
         */
        public void removeTopicListener(short topic, TopicListener c) {
                Short id = new Short(topic);
  
                // Write Lock
                synchronized (topicListeners) {
                        // We do not modify the current topicListeners list
                        // so that iterators on the topicListeners do not fail.
                        LinkedList ll = (LinkedList) topicListeners.get(id);
                        if (ll == null) {
                                // Listener had not been added
                                return;
                        } else {
                                ll = (LinkedList) ll.clone();
                        }
                        boolean ok = ll.remove(c);
                        if (!ok) {
                                // Listener had not been added
                                return;
                        }
  
                        HashMap t = (HashMap) topicListeners.clone();
                        if (ll.size() > 0)
                                t.put(id, ll);
                        else
                                t.remove(id);
  
                        topicListeners = t;
                }
        }
  
        /**
         * Are there any local listners on the given topic
         */
        public boolean isListeningOn(short topic) {
                return topicListeners.get(new Short(topic)) != null;
        }
  
        /**
         * Adds a custom externalizer for a channel.
         *
         * Using a custom externalizer for a channel could give BIG performance
         * gains since the type of object that will be sent over the channel will be
         * FIXED.
         */
        public void addCustomExternalizer(short topic, Class c) {
                Short id = new Short(topic);
  
                // Write Lock
                synchronized (customExternalizers) {
                        HashMap t = (HashMap) topicListeners.clone();
                        t.put(id, c);
                        customExternalizers = t;
                }
  
        }
  
        /**
         * Removes the custom externalized for the channel
         */
        public void removeCustomExternalizer(short topic, Class c) {
                Short id = new Short(topic);
  
                // Write Lock
                synchronized (customExternalizers) {
                        HashMap t = (HashMap) topicListeners.clone();
                        t.remove(id);
                        customExternalizers = t;
                }
  
        }
  
        /**
         * Used internally to send a message to all the local listeners.
         */
        private void messageArrivedEvent(short topic, short sender, Object message)
                throws InterruptedException {
                LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
                if (ll == null) {
                        // No listeners...
                        return;
                }
  
                Iterator i = ll.iterator();
                while (i.hasNext()) {
                        TopicListener c = (TopicListener) i.next();
                        c.onMessage(topic, sender, message);
                }
        }
  
        /**
         * Send a message to the cluster on the given channel.
         * The local node will also get the message if localBroadcast is true.
         * The droppable flag is hint to the transport layer.
         */
        public void send(
                short topic,
                Object message,
                boolean localBroadcast,
                boolean droppable,
                boolean keepOrder)
                throws IOException, InterruptedException {
  
                if (localBroadcast)
                        messageArrivedEvent(topic, getNodeId(), message);
  
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream os = new ObjectOutputStream(bos);
  
                if (message instanceof Externalizable) {
                        ((Externalizable) message).writeExternal(os);
                } else {
                        os.writeObject(message);
                }
                os.close();
  
                byte data[] = bos.toByteArray();
                transport.send(topic, data, droppable, keepOrder);
  
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java
  
  Index: ClusterTesterReceiver.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import EDU.oswego.cs.dl.util.concurrent.*;
  import org.jbossmq.cluster.udp.UDPTransport;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class ClusterTesterReceiver {
  
        public static void main(java.lang.String[] args) throws Exception {
  
                Cluster cluster = new Cluster();
                UDPTransport transport = new UDPTransport();
  
                java.util.Properties p = new java.util.Properties();
                java.io.InputStream is =
                        
transport.getClass().getClassLoader().getResourceAsStream("receiver.txt");
                p.load(is);
                is.close();
  
                transport.setProperties(p);
                cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
                cluster.setTransport(transport);
  
                short echoChannelId = 13;
  
                final Channel echoChannel = new BoundedLinkedQueue(10);
                TopicListener myListener = new TopicListener() {
                        public void onMessage(short topic, short sender, Object m) {
                                try {
                                        System.out.println("Got :" + m);
                                        //echoChannel.put( m );
                                } catch (Exception e) {
                                }
                        }
                };
  
                cluster.addTopicListener(echoChannelId, myListener);
  
                transport.start();
  
                System.out.println("Waiting for messages");
                while (true) {
                        String t = (String) echoChannel.take();
                        System.out.println("Got :" + t);
                }
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java
  
  Index: ClusterTesterSender.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import EDU.oswego.cs.dl.util.concurrent.*;
  import org.jbossmq.cluster.udp.UDPTransport;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class ClusterTesterSender {
        
        /**
         * Starts the application.
         * @param args an array of command-line arguments
         */
        public static void main(java.lang.String[] args) throws Exception {
  
                Cluster cluster = new Cluster();
                UDPTransport transport = new UDPTransport();
  
                java.util.Properties p = new java.util.Properties();
                java.io.InputStream is =
                        
transport.getClass().getClassLoader().getResourceAsStream("sender.txt");
                p.load(is);
                is.close();
  
                transport.setProperties(p);
                cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
                cluster.setTransport(transport);
  
                transport.start();
  
                short echoChannelId = 13;
  
                String t = "Hello World";
                for (int i = 0; i < 500; i++) {
                        cluster.send(echoChannelId, t + " #" + i);
                }
                System.out.println("Message, sent");
                synchronized (cluster) {
                        cluster.wait();
                }
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
  
  Index: InvalidConfigurationException.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.lang.Exception;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class InvalidConfigurationException extends Exception {
        
        /**
         * InvalidConfigurationException constructor comment.
         */
        public InvalidConfigurationException() {
                super();
        }
        
        /**
         * InvalidConfigurationException constructor comment.
         * @param s java.lang.String
         */
        public InvalidConfigurationException(String s) {
                super(s);
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
  
  Index: InvalidStateException.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.lang.Exception;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class InvalidStateException extends Exception {
        
        /**
         * InvalidConfigurationException constructor comment.
         */
        public InvalidStateException() {
                super();
        }
        
        /**
         * InvalidConfigurationException constructor comment.
         * @param s java.lang.String
         */
        public InvalidStateException(String s) {
                super(s);
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
  
  Index: SerializerUtil.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.lang.Object;
  
  /**
   * This is a utility class which hold functions useful for converting from
   * Java primitive types to byte[]s
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class SerializerUtil {
  
        /**
         * This is used for unit testing the class
         * @param args java.lang.String[]
         */
        public static void main(String[] args) {
  
                byte data[] = new byte[8];
  
                for (int i = -10; i < 150; i++) {
                        writeIntTo(i, data, 0);
                        int r = readIntFrom(data, 0);
                        System.out.println("Int :" + i + "==" + r);
                }
                for (long i = -10; i < 150; i++) {
                        writeLongTo(i, data, 0);
                        long r = readLongFrom(data, 0);
                        System.out.println("Long :" + i + "==" + r);
                }
  
        }
        
        public static int readIntFrom(byte[] data, int offset) {
                return (
                        ((data[offset] & 0xFF) << 24)
                                | ((data[offset + 1] & 0xFF) << 16)
                                | ((data[offset + 2] & 0xFF) << 8)
                                | ((data[offset + 3] & 0xFF) << 0));
        }
        
        public static long readLongFrom(byte[] data, int offset) {
                return (
                        ((long) (data[offset] & 0xFF) << 56)
                                | ((long) (data[offset + 1] & 0xFF) << 48)
                                | ((long) (data[offset + 2] & 0xFF) << 40)
                                | ((long) (data[offset + 3] & 0xFF) << 32)
                                | ((long) (data[offset + 4] & 0xFF) << 24)
                                | ((long) (data[offset + 5] & 0xFF) << 16)
                                | ((long) (data[offset + 6] & 0xFF) << 8)
                                | ((long) (data[offset + 7] & 0xFF) << 0));
        }
        
        public static short readShortFrom(byte[] data, int offset) {
                return (short)
                        (((data[offset] << 8) & 0xFF) | ((data[offset + 1] << 0) & 
0xFF));
        }
        
        static public void writeIntTo(int v, byte[] data, int pos) {
                data[pos] = (byte) ((v >>> 24) & 0xFF);
                data[pos + 1] = (byte) ((v >>> 16) & 0xFF);
                data[pos + 2] = (byte) ((v >>> 8) & 0xFF);
                data[pos + 3] = (byte) ((v >>> 0) & 0xFF);
        }
        
        static public void writeLongTo(long v, byte data[], int pos) {
                data[pos] = (byte) ((v >>> 56) & 0xFF);
                data[pos + 1] = (byte) ((v >>> 48) & 0xFF);
                data[pos + 2] = (byte) ((v >>> 40) & 0xFF);
                data[pos + 3] = (byte) ((v >>> 32) & 0xFF);
                data[pos + 4] = (byte) ((v >>> 24) & 0xFF);
                data[pos + 5] = (byte) ((v >>> 16) & 0xFF);
                data[pos + 6] = (byte) ((v >>> 8) & 0xFF);
                data[pos + 7] = (byte) ((v >>> 0) & 0xFF);
        }
        
        public static void writeShortTo(short v, byte[] data, int pos) {
                data[pos] = (byte) ((v >>> 8) & 0xFF);
                data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
  
  Index: TopicListener.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.lang.Object;
  
  /**
   * This is the interface clients must implement to receive messages
   * from the cluster.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   *
   */
  public interface TopicListener {
        public void onMessage( short topic, short sender, Object message );
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/Transport.java
  
  Index: Transport.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.io.IOException;
  
  /**
   * This is the interface any Transport class must implement.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   *
   */
  public interface Transport {
  
        public void setCluster(Cluster c);
        public void setTransportListener(TransportListener c);
        public void send(short channelId, byte data[], boolean droppable, boolean 
keepOrder) throws IOException, InterruptedException;
        
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
  
  Index: TransportListener.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import java.lang.*;
  
  import java.lang.Object;
  
  /**
   * This class is used by the Transport interface to signal the
   * the cluster that a message has arrived from the network.
   * 
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public interface TransportListener {
        public void messageArrivedEvent( short channelId, short senderId, byte 
message[] ) throws InterruptedException;
  }
  
  

Reply via email to