User: hiram   
  Date: 01/02/14 19:33:07

  Added:       src/main/org/jbossmq/cluster/udp AdminPacketHandler.java
                        Datagram.java InboundStream.java UDPTransport.java
  Log:
  Implemented a Multicast based pub/sub subsystem targeted for building clustered 
applications.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/AdminPacketHandler.java
  
  Index: AdminPacketHandler.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import java.util.LinkedList;
  import java.net.DatagramSocket;
  import java.net.DatagramPacket;
  import java.io.ObjectOutputStream;
  import java.io.ByteArrayInputStream;
  import java.io.Externalizable;
  import java.io.ObjectInputStream;
  import java.io.IOException;
  import java.io.ByteArrayOutputStream;
  
  import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
  
  /**
   * This class manages the Admin topic.  The admin channel is used
   * for packet retransmisions.  When packets get sent, they are placed
   * on a packet cache.  Other nodes receiving packets may send a request
   * on the admin topic for this node to retransmit a packet.  If the packet
   * is in the cache, it is retransmited.  If not, a reply packet is sent
   * on the admin channel reporting that the message is not in the cache.
   *
   * This class works closely with the InboundStream class to implement
   * the error recovery stated above.
   *
   * It registers itself as a TopicListner in the Cluster.  All messages a
   * process synchronously so that we are running under the InboundStream
   * thread.  This allows us to work with InboundStream data w/out
   * synchronizing.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  class AdminPacketHandler implements org.jbossmq.cluster.TopicListener {
  
        // The transport owning this Inbound channel
        UDPTransport transport;
        // Used to for error recovery.
        LinkedList sentCache = new LinkedList();
        // Up to how many sent packets do we cache?
        int maxSentCacheSize = 500;
  
        //
        // This is the message sent to request a packet be resent
        //
        static class ResendAdminDatagram implements java.io.Serializable {
                ResendAdminDatagram(short i, int j) {
                        originator = i;
                        datagramId = j;
                }
                short originator;
                int datagramId;
        }
  
        //
        // This is the message sent when the requested packet is not in the cache.
        //
        static class ResendErrorAdminDatagram implements java.io.Serializable {
                ResendErrorAdminDatagram(short i, int j) {
                        originator = i;
                        datagramId = j;
                }
                short originator;
                int datagramId;
        }
  
        // Adds a packet to the cache
        void addToSentCache(Datagram dg) {
                synchronized (sentCache) {
                        sentCache.addFirst(dg);
                        if (sentCache.size() > maxSentCacheSize)
                                sentCache.removeLast();
                }
        }
  
        // Sends a resend request
        void requestResend(short nodeId, int messageId) {
  
                System.out.println("[Cluster] SENDING RESEND REQUEST: " + messageId);
                ResendAdminDatagram m = new ResendAdminDatagram(nodeId, messageId);
                try {
                        adminSend(m);
                } catch (Exception e) {
                        e.printStackTrace();
                }
  
        }
  
        // Resends the message if it was found in the cache
        void resendDatagram(int messageId) throws InterruptedException, 
java.io.IOException {
  
                LinkedList t;
                synchronized (sentCache) {
                        t = (LinkedList) sentCache.clone();
                }
                java.util.Iterator i = t.iterator();
                while (i.hasNext()) {
                        Datagram dg = (Datagram) i.next();
                        if (dg.getId() == messageId) {
                                System.out.println("[Cluster] RESEND REQUEST SERVICED: 
" + messageId);
                                System.out.println("Resending : " + messageId);
                                transport.send(dg);
                                return;
                        }
                }
  
                System.out.println("[Cluster] RESEND REQUEST NO SERVICED (not in 
cache): " + messageId);
                // The datagram had been removed from the cache...
                ResendErrorAdminDatagram error = new 
ResendErrorAdminDatagram(transport.cluster.getNodeId(), messageId);
                adminSend(error);
        }
  
        // This send is used to send the administrative messages.
        public void adminSend(Object message) throws IOException, InterruptedException 
{
                transport.cluster.send(transport.cluster.MANAGMENT_TOPIC, message, 
false, true, false);
        }
  
        /**
         * This method is called when a message is recived over the network
         * on the managment topic.
         */
        public void onMessage(short topic, short sender, java.lang.Object dg) {
  
                try {
  
                        if (dg instanceof ResendAdminDatagram) {
                                ResendAdminDatagram radg = (ResendAdminDatagram) dg;
                                if (radg.originator != transport.cluster.getNodeId())
                                        return;
                                System.out.println("[Cluster] RESEND REQUEST ARRIVED: 
" + radg.datagramId);
                                resendDatagram(radg.datagramId);
                        } else if (dg instanceof ResendErrorAdminDatagram) {
  
                                ResendErrorAdminDatagram readg = 
(ResendErrorAdminDatagram) dg;
                                System.out.println("[Cluster] RESEND REQUEST FAILED: " 
+ readg.datagramId);
                                
transport.inBoundStream.removeFromDatagramStream(readg.originator, readg.datagramId);
                        }
  
                } catch (IOException e) {
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                }
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java
  
  Index: Datagram.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import org.jbossmq.cluster.SerializerUtil;
  
  /**
   * This class is used to access/build the datagram packets
   * sent by the UDP transport.  Since this later has to be turnned
   * into a byte[] to send over UDP, this class stores it's data
   * in a byte[] once it is constructed.  This speeds up serialization
   * since it is ready to be serialized after construction.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  class Datagram {
  
        // MESSAGE FLAGS
        final static int DROPPABLE_MESSAGE_FLAG = 0;
        final static int KEEP_ORDER_MESSAGE_FLAG = 1;
  
        // Field offsets in the header of the message
        final static int ID_OFFSET = 0; // int
        final static int LENGTH_OFFSET = 4; // int
        final static int FRAGMENT_ID_OFFSET = 8; // short
        final static int SENDER_ID_OFFSET = 10; // short
        final static int TOPIC_ID_OFFSET = 12; // short
        final static byte MESSAGE_FLAGS_OFFSET = 14; // byte
        final static int HEADER_SIZE = 15;
  
        // the serialized message 
        byte data[];
        // if this is part of a fragment, next fragment should
        // point to the next fragment.
        Datagram nextFragment;
  
        /**
         * This is used to reconstruct a datagram that was serialized
         */
        Datagram(byte datagram[], int length) {
                data = new byte[length];
                for (int i = 0; i < length; i++) {
                        data[i] = datagram[i];
                }
        }
  
        /**
         *  This is used to build a new datagram
         */
        Datagram(
                int id,
                int length,
                short fragmentId,
                short senderId,
                short topic,
                boolean droppable,
                boolean keepOrder,
                byte payload[],
                int offset,
                int len) {
  
                data = new byte[len + HEADER_SIZE];
                SerializerUtil.writeIntTo(id, data, ID_OFFSET);
                SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
                SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
                SerializerUtil.writeShortTo(senderId, data, SENDER_ID_OFFSET);
                SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
                data[MESSAGE_FLAGS_OFFSET] =
                        (byte) (((droppable ? 1 : 0) << DROPPABLE_MESSAGE_FLAG) | 
((keepOrder ? 1 : 0) << KEEP_ORDER_MESSAGE_FLAG));
  
                for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
                        data[i] = payload[j];
        }
  
        /**
         * Gets the original payload of the datagram.
         * @return byte[]
         */
        byte[] getData() {
                byte[] newData = new byte[data.length - HEADER_SIZE];
                for (int i = 0, j = HEADER_SIZE; j < data.length; i++, j++)
                        newData[i] = data[j];
                return newData;
        }
  
        /**
         * Calculates how many datagrams are needs to build the message.
         * This is dependent on the maxFragment size that is used.
         *
         * @return short
         */
        int getFragmentCount(int maxFragmentSize) {
                return getLength() / maxFragmentSize + (getLength() % maxFragmentSize 
== 0 ? 0 : 1);
        }
  
        short getFragmentId() {
                return SerializerUtil.readShortFrom(data, FRAGMENT_ID_OFFSET);
        }
  
        /**
         * The id of the message.  All the fragments message will have the
         * same Id, just different fragment ids.
         *
         * @return int
         */
        int getId() {
                return SerializerUtil.readIntFrom(data, ID_OFFSET);
        }
  
        /**
         * The total length of the message.
         * @return int
         */
        int getLength() {
                return SerializerUtil.readIntFrom(data, LENGTH_OFFSET);
        }
  
        /**
         * The id of the sender of this message.
         * @return int
         */
        short getSenderId() {
                return SerializerUtil.readShortFrom(data, SENDER_ID_OFFSET);
        }
  
        /**
         * The topic under which this message was sent.
         * @return short
         */
        short getTopicId() {
                return SerializerUtil.readShortFrom(data, TOPIC_ID_OFFSET);
        }
  
        boolean isDroppable() {
                return ((data[MESSAGE_FLAGS_OFFSET] >> DROPPABLE_MESSAGE_FLAG) & 0x01) 
!= 0;
        }
  
        boolean isKeepOrder() {
                return ((data[MESSAGE_FLAGS_OFFSET] >> KEEP_ORDER_MESSAGE_FLAG) & 
0x01) != 0;
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/InboundStream.java
  
  Index: InboundStream.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import java.net.DatagramSocket;
  import java.net.DatagramPacket;
  import java.util.LinkedList;
  import java.util.Vector;
  import java.util.Iterator;
  
  /**
   * The InboundStream class blocks on the socket
   * waiting for input.  Since a message is make up 
   * of multiple fragments and UDP does not garantee message
   * delivery, we keep track of the "Stream" of message
   * being sent by all the nodes in the cluster.
   *
   * The "Stream" is a linked list of MessageState objects.
   * The head of the list is the oldest message that has not been completed
   * yet.  The tail of the list is the most recently received message from
   * the node.  Messages between the tail and head end could be completely
   * received or in progess.  Message marked as "not required to be in order"
   * are sent to the cluster to be processed by the listners as soon as they are 
   * completed, and removed form the "Stream".  When the head message is completed
   * it is sent up to the cluster to be processed and it is removed from the "stream"
   * When the head message is != to the tail message, this means that a datagram
   * was dropped and the head message needs to be resent, so a resend request is 
issued.
   * When a new message arrives it is placed at the end of the message stream but
   * filler message will be added to the stream to fill the space left by any dropped
   * messages.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  class InboundStream implements Runnable {
  
        // The channel manager owning this Inbound channel
        UDPTransport transport;
        // The thread that is reciving messages.
        Thread runningThread;
        // Used to stop the thread
        boolean done = false;
  
        // We keep it in a vector so we can do fast index based lookups.
        // We assume that nodeids will range from 0+
        Vector nodes = new Vector(10);
  
        // Keeps track of the progress of message reception
        static class MessageState {
                MessageState(int id) {
                        this.id = id;
                }
                int id;
                boolean arrived = false;
                boolean isDropable = false;
                long lastResendRequest = 0;
                Datagram[] fragments;
        }
  
        // Keeps track of the state of a remote nodes in the cluster
        static class NodeState {
                short nodeId;
                LinkedList datagramStream = new LinkedList();
                int lastDatagramId = 0;
                boolean pastFirstPacket = false;
        }
  
        // The maximum size the datagram stream can grow to.
        static final int MAX_DATAGRAM_STREAM_SIZE = 200;
  
        MessageState getMessageState(NodeState nodeState, int id) {
                boolean rollOver = id < -5000 && nodeState.lastDatagramId > 5000;
  
                if (!nodeState.pastFirstPacket) {
  
                        MessageState t = new MessageState(id);
                        nodeState.datagramStream.addLast(t);
                        nodeState.lastDatagramId = id;
                        nodeState.pastFirstPacket = true;
                        return t;
  
                } else if (nodeState.lastDatagramId == id) {
  
                        // The packet id is the same as the last packet.
                        return (MessageState) nodeState.datagramStream.getLast();
  
                } else if (nodeState.lastDatagramId < id || rollOver) {
  
                        // The packet id is newer than the last packet
                        int i = nodeState.lastDatagramId;
                        while (i != id) {
                                i++;
                                // Packets got dropped but we record them in the 
stream anyways.
                                MessageState t = new MessageState(i);
                                nodeState.datagramStream.addLast(t);
  
                                if (nodeState.datagramStream.size() > 
MAX_DATAGRAM_STREAM_SIZE) {
                                        handleNodeUnresponsive(nodeState);
                                        return null;
                                }
                        }
                        nodeState.lastDatagramId = id;
                        return (MessageState) nodeState.datagramStream.getLast();
  
                } else {
  
                        // The packet id older than the last packet.  We have to 
search for it
                        // int the datagram stream.
                        Iterator i = nodeState.datagramStream.iterator();
                        while (i.hasNext()) {
                                MessageState ds = (MessageState) i.next();
                                if (ds.id == id)
                                        return ds;
                        }
  
                        // This is weird.  A node might have retransmitted a packet 
for a
                        // message that was succesfully recived the first time 
(therefore
                        // we are not pending messages for it in the datagramStream)
                        return null;
  
                }
        }
  
        // We should do something else to inform the cluster that
        // a node is being unresponsive.  This could be due to node
        // failure or network failure.
        // This happens when the message stream gets too long.
        // TODO: make it also happen when a message sits too long on
        // the stream.
        void handleNodeUnresponsive(NodeState nodeState) {
                System.out.println("NODE IS UNRESPONSIVE!!!");
        }
  
        // Stops the input thread
        synchronized void start() {
                if (runningThread != null)
                        return;
                done = false;
                runningThread = new Thread(this, "UDPTransport Inbound Thread");
                runningThread.start();
        }
  
        // Starts the input thread
        synchronized void stop() throws InterruptedException {
                if (runningThread == null)
                        return;
                done = true;
                runningThread.interrupt();
                runningThread.join();
                runningThread = null;
        }
  
  
        /**
         * The thread of the InboundStream
         */
        public void run() {
  
                try {
  
                        byte buffer[] = new byte[transport.maxFragmentSize + 
Datagram.HEADER_SIZE];
                        DatagramPacket packet = new java.net.DatagramPacket(buffer, 
buffer.length);
  
                        // We iterate at least every 500 miliseconds so that we can
                        // dispatch resend request messages frequently
                        transport.socket.setSoTimeout(500);
  
                        while (!done) {
  
                                try {
                                        
                                        // We might be doing this too often..
                                        for (int i = 0; i < nodes.size(); i++) {
                                                NodeState nodeState = (NodeState) 
nodes.elementAt(i);
                                                if (nodeState != null)
                                                        dispatchMessages(nodeState);
                                        }
  
                                        // Read in a message from the network
                                        packet.setData(buffer);
                                        packet.setLength(buffer.length);
                                        transport.socket.receive(packet);
  
                                        // Is the packet ok?
                                        if (packet.getLength() < Datagram.HEADER_SIZE) 
{
                                                System.out.println("Packet was too 
small, dropping.");
                                                continue;
                                        }
  
                                        // Build the Datagram object form the network 
data
                                        Datagram dg = new Datagram(packet.getData(), 
packet.getLength());
  
                                        // Drop the packet if this node sent this 
packet
                                        if (dg.getSenderId() == 
transport.cluster.getNodeId()) {
                                                // This is a way to detect if another 
channel manager
                                                // is using our senderid.
                                                if (dg.getId() > 
transport.nextDatagramId) {
                                                        
transport.senderConflictEvent();
                                                }
                                                continue;
                                        }
  
                                        // Used to test the packet retransmision 
faclities
                                        // of the UDPTransport class.  We drop 1 in 10 
packets.
                                        java.util.Random r = new java.util.Random();
                                        if (r.nextInt(100) < 10) {
                                                System.out.println("Test packet DROP: 
" + dg.getId());
                                                continue;
                                        }
  
                                        // Continue processing the datagram
                                        processDatagram(dg);
  
                                } catch (java.io.InterruptedIOException e) {
                                } catch (InterruptedException e) {
                                }
                        }
  
                } catch (java.io.IOException e) {
                        e.printStackTrace();
                }
        }
        
        /**
         * This places the datagram in the proper position in the
         * message "Stream"
         */
        void processDatagram(Datagram dg) throws InterruptedException {
  
                short senderId = dg.getSenderId();
                int id = dg.getId();
                short fragmentId = dg.getFragmentId();
                NodeState nodeState = getNodeState(senderId);
                MessageState ds = getMessageState(nodeState, id);
  
                System.out.println("Processing datagram: " + id);
  
                // We might have allready received this message.
                if (ds == null || ds.arrived)
                        return;
  
                if (dg.getFragmentCount(transport.maxFragmentSize) == 1) {
                        // The entire message was contained within this datagram
                        ds.arrived = true;
                        ds.fragments = new Datagram[1];
                        ds.fragments[0] = dg;
                } else {
                        // The message is fragmented.
                        if (ds.fragments == null) {
                                ds.fragments = new 
Datagram[dg.getFragmentCount(transport.maxFragmentSize)];
                                ds.isDropable = dg.isDroppable();
                        }
  
                        ds.fragments[fragmentId] = dg;
  
                        // Check to see if we have recived all the fragments?
                        boolean messageLoaded = true;
                        // iterating backwards will break us out of the loop 
                        // quicker in the common case
                        for (int i = ds.fragments.length - 1; i >= 0; i--) {
                                if (ds.fragments[i] == null) {
                                        messageLoaded = false;
                                        break;
                                }
                        }
  
                        if (messageLoaded) {
                                // chain the fragments together
                                for (int i = 0; i < ds.fragments.length - 1; i++)
                                        ds.fragments[i].nextFragment = ds.fragments[i 
+ 1];
  
                                ds.arrived = true;
                        }
                }
        }
  
        /**
         * This method does the bulk of the "Stream" managment.
         * - it delivers messages that are complete.
         * - drops messages that are incomplete and droppable.
         * - Requests retransmision of the incomplete packet at the
         *   head of the stream.
         */
        void dispatchMessages(NodeState nodeState) throws InterruptedException {
  
                if (nodeState.datagramStream.size() == 0)
                        return;
  
                boolean atFront = true;
                Object lastDS = nodeState.datagramStream.getLast();
                Iterator i = nodeState.datagramStream.iterator();
                MessageState firstMissing = null;
                while (i.hasNext()) {
  
                        MessageState ds = (MessageState) i.next();
  
                        if (ds.arrived) {
                                if (atFront || !ds.fragments[0].isKeepOrder()) {
                                        transport.messageArrivedEvent(ds.fragments[0]);
                                        i.remove();
                                }
                                continue;
                        } else {
                                atFront = false;
  
                                if (ds == lastDS) {
                                        continue;
                                } else if (ds.isDropable) {
                                        i.remove();
                                } else {
                                        if (firstMissing == null) {
                                                firstMissing = ds;
                                        }
                                }
                        }
                }
  
                if (firstMissing != null && System.currentTimeMillis() - 
firstMissing.lastResendRequest > 100) {
  
                        transport.adminPacketHandler.requestResend(nodeState.nodeId, 
firstMissing.id);
                        firstMissing.lastResendRequest = System.currentTimeMillis();
                }
        }
  
        /**
         * Gets the NodeState for the given node id.
         * if none exists, a new NodeState object is created.
         */
        NodeState getNodeState(short nodeId) {
                if (nodeId >= nodes.size() || nodes.elementAt(nodeId) == null) {
                        if (nodeId >= nodes.size())
                                nodes.setSize(nodeId + 1);
  
                        NodeState t = new NodeState();
                        t.nodeId = nodeId;
                        nodes.add(nodeId, t);
                }
                return (NodeState) nodes.elementAt(nodeId);
        }
  
        /**
         * This is used by the AdminPacketHandler.  Called when
         * a packet cannot be retransmited by the host (It was droppable or
         * packet was not in the packet cache).
         *
         * We mark the packet as droppable so it gets removed from the stream
         * the next time the messages are dispatch on the stream.
         */
        void removeFromDatagramStream(short nodeId, int datagramId) {
                NodeState nodeState = getNodeState(nodeId);
  
                Iterator i = nodeState.datagramStream.iterator();
                while (i.hasNext()) {
                        MessageState ds = (MessageState) i.next();
                        if (ds.id == datagramId)
                                ds.isDropable = true;
                }
                
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java
  
  Index: UDPTransport.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import java.net.DatagramSocket;
  import java.net.InetAddress;
  import java.net.MulticastSocket;
  import java.net.DatagramPacket;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.io.IOException;
  
  import org.jbossmq.cluster.TransportListener;
  
  /**
   * This Transport implements a Datagram based cluster transport.
   * It Can use UDP or Multicast.
   *
   * As a Transport, it will fragment outbound messages that are
   * too large and recobine inbound message fragments.
   *
   * Since Datagrams have a chance of not making it, this class
   * also provides retransmision facilities.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class UDPTransport implements org.jbossmq.cluster.Transport {
  
        // The class responsible for pulling messages from the network
        InboundStream inBoundStream = new InboundStream();
        // The interface into the cluster we report message arivals to
        TransportListener transportListener;
        // We incrementaly number the datagrams we send
        int nextDatagramId = 0;
  
        AdminPacketHandler adminPacketHandler = new AdminPacketHandler();
        org.jbossmq.cluster.Cluster cluster;
        // Adderes datagrams will be sent to
        InetAddress datagramAddDestination;
        // Port datagrams will be sent to
        int datagramPortDestination;
        // A message can be composed of multiple datagram fragments.
        int maxFragmentSize = 1024;
        // Socket that we will be sending data over.
        DatagramSocket socket;
        boolean started = false;
  
        // I like simple constructors.
        public UDPTransport() {
                inBoundStream.transport = this;
                adminPacketHandler.transport = this;
        }
  
        
        // I am assuming that the post increment is atomic, 
        // otherwise this needs to get synchronized
        int getNextDatagramId() {
                return nextDatagramId++;
        }
  
        // A complete datagram has arrived to us from the InputStream
        // We build a object from it and send it to the TransportListener
        void messageArrivedEvent(Datagram dg) throws InterruptedException {
  
                if (!cluster.isListeningOn(dg.getTopicId()))
                        return;
  
                byte data[];
                if (dg.getFragmentCount(maxFragmentSize) == 1) {
                        data = dg.getData();
                } else {
                        data = new byte[dg.getLength()];
                        int dataPos = 0;
                        while (dg != null) {
                                for (int i = dataPos, j = Datagram.HEADER_SIZE; j < 
dg.data.length; i++, j++)
                                        data[i] = dg.data[j];
                                dataPos += dg.data.length - Datagram.HEADER_SIZE;
                                dg = dg.nextFragment;
                        }
                }
  
                transportListener.messageArrivedEvent(dg.getTopicId(), 
dg.getSenderId(), data);
  
        }
  
        // This is set when the transport is set in the cluster.
        public void setCluster(org.jbossmq.cluster.Cluster newCluster) {
                cluster = newCluster;
                cluster.addTopicListener(cluster.MANAGMENT_TOPIC, adminPacketHandler);
        }
  
        // This is set when the transport is set in the cluster.
        public void setTransportListener(org.jbossmq.cluster.TransportListener c) {
                transportListener = c;
        }
  
        // This called when the InputStream detects that another node
        // in the cluster is using our nodeId.
        void senderConflictEvent() {
                System.out.println("WARNNING: NodeId conflict detected.");
        }
  
        // This method is used to initialize is
        // transport layer.
        synchronized public void setProperties(Hashtable t)
                throws org.jbossmq.cluster.InvalidConfigurationException, 
org.jbossmq.cluster.InvalidStateException {
  
                if (started)
                        throw new org.jbossmq.cluster.InvalidStateException("Transport 
is allready started");
  
                String transportMode = (String) t.get("TransportMode");
                String groupString = (String) t.get("Group");
                String useInterfaceString = (String) t.get("UseInterface");
                String portString = (String) t.get("Port");
  
                if (transportMode == null) {
                        throw new 
org.jbossmq.cluster.InvalidConfigurationException("TransportMode property not set");
                } else if (transportMode.equals("Multicast")) {
                        if (portString == null) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException("Port property not set");
                        }
                        if (groupString == null) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException("Group property not set");
                        }
                        try {
  
                                InetAddress group = InetAddress.getByName(groupString);
                                int port = Integer.parseInt(portString);
  
                                MulticastSocket s = new MulticastSocket(port);
                                s.joinGroup(group);
  
                                if (useInterfaceString != null) {
                                        
s.setInterface(InetAddress.getByName(useInterfaceString));
                                }
  
                                datagramPortDestination = port;
                                datagramAddDestination = group;
                                socket = s;
  
                        } catch (IOException e) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException(
                                        "Ip group of the Group property was invalid: " 
+ e.getMessage());
                        }
                } else if (transportMode.equals("Broadcast")) {
                        if (portString == null) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException("Port property not set");
                        }
  
                        try {
  
                                int port = Integer.parseInt(portString);
                                DatagramSocket s = new DatagramSocket(port);
                                InetAddress group = 
InetAddress.getByName("255.255.255.255");
  
                                datagramPortDestination = port;
                                datagramAddDestination = group;
                                socket = s;
  
                        } catch (IOException e) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException("Port property was invalid: " + 
e.getMessage());
                        }
  
                } else {
                        throw new 
org.jbossmq.cluster.InvalidConfigurationException("TransportMode property invalid");
                }
        }
  
        // Starts the transport 
        synchronized public void start() throws 
org.jbossmq.cluster.InvalidStateException {
                if (socket == null)
                        throw new org.jbossmq.cluster.InvalidStateException("The 
transport properties have not been set yet.");
  
                started = true;
                inBoundStream.start();
        }
  
        // Stops the transport 
        synchronized public void stop() throws InterruptedException {
                inBoundStream.stop();
                started = false;
        }
  
        // Sends a datagram fragment chain over the network
        void send(Datagram dg) throws InterruptedException, java.io.IOException {
                // Send the fragment chain.
                adminPacketHandler.addToSentCache(dg);
                while (dg != null) {
                        DatagramPacket packet =
                                new java.net.DatagramPacket(dg.data, dg.data.length, 
datagramAddDestination, datagramPortDestination);
                        socket.send(packet);
                        dg = dg.nextFragment;
                }
        }
  
        // Builds a datagram fragment chain with the given data
        // and then sends it over the network
        public void send(short channelId, byte data[], boolean droppable, boolean 
keepOrder)
                throws IOException, InterruptedException {
                int id = getNextDatagramId();
                int length = data.length;
                short fragmentId = 0;
  
                Datagram firstFragment = null;
                Datagram lastFragment = null;
  
                // while the whole message has not been proccesed
                while (fragmentId * maxFragmentSize < length) {
  
                        Datagram dg;
                        // is what is left bigger than a fragment?
                        if ((length - (fragmentId * maxFragmentSize)) > 
maxFragmentSize) {
                                // This a MAX size fragment
                                dg =
                                        new Datagram(
                                                id,
                                                length,
                                                fragmentId,
                                                cluster.getNodeId(),
                                                channelId,
                                                droppable,
                                                keepOrder,
                                                data,
                                                fragmentId * maxFragmentSize,
                                                maxFragmentSize);
                        } else {
                                // This a not a MAX size fragment
                                dg =
                                        new Datagram(
                                                id,
                                                length,
                                                fragmentId,
                                                cluster.getNodeId(),
                                                channelId,
                                                droppable,
                                                keepOrder,
                                                data,
                                                fragmentId * maxFragmentSize,
                                                length - (fragmentId * 
maxFragmentSize));
                        }
                        fragmentId++;
  
                        // Chain the fragments together.
                        if (firstFragment == null) {
                                firstFragment = lastFragment = dg;
                        } else {
                                lastFragment.nextFragment = dg;
                                lastFragment = dg;
                        }
                }
  
                // Send the fragment chain.
                send(firstFragment);
        }
  }
  
  

Reply via email to