User: hiram   
  Date: 00/11/16 14:43:37

  Added:       src/java/org/spydermq/multiplexor DemuxInputStream.java
                        MultiplexorTest.java MuxOutputStream.java
                        SocketMultiplexor.java StreamMux.java
  Log:
  These class provide a way to multiplex the use of a socket
  
  Revision  Changes    Path
  1.1                  spyderMQ/src/java/org/spydermq/multiplexor/DemuxInputStream.java
  
  Index: DemuxInputStream.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.io.InterruptedIOException;
  import java.io.IOException;
  import java.io.InputStream;
  
  /**
   * Objects of this class provide and an InputStream
   * from a StreamDemux.  
   *
   * Objects of this class are created by a StreamDemux object. 
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  class DemuxInputStream extends InputStream {
  
        StreamDemux streamDemux;
        short streamId;
        boolean atEOF = false;
  
        Object bufferMutex = new Object();
        byte buffer[];
        short bufferEndPos;
        short bufferStartPos;
        
        DemuxInputStream(StreamDemux demux, short id) {
                streamDemux = demux;
                streamId = id;
                buffer = new byte[1000];
                bufferStartPos = 0;
                bufferEndPos = 0;
        }
        
        public int available() throws IOException {
                return getBufferFillSize();
        }
        
        public void close() throws IOException {
                streamDemux.closeStream(streamId);
        }
        
        private int getBufferFillSize() {
                return bufferStartPos <= bufferEndPos ? bufferEndPos - bufferStartPos 
: buffer.length - (bufferStartPos - bufferEndPos);
        }
        
        private int getBufferFreeSize() {
                return (buffer.length - 1) - getBufferFillSize();
        }
        
        public void loadBuffer(byte data[], short dataLength) throws IOException {
                int freeSize = 0;
                int dataPos = 0;
                while (dataPos < dataLength) {
                        synchronized (bufferMutex) {
                                while ((freeSize = getBufferFreeSize()) == 0) {
                                        try {
                                                // Wait till the consumer notifies us 
he has
                                                // removed some data from the buffer.
                                                bufferMutex.wait();
                                        } catch (InterruptedException e) {
                                                throw new 
InterruptedIOException(e.getMessage());
                                        }
                                }
                                // the buffer should have free space now.
                                freeSize = Math.min(freeSize, dataLength - dataPos);
                                for (int i = 0; i < freeSize; i++) {
                                        buffer[bufferEndPos++] = data[dataPos + i];
                                        bufferEndPos = bufferEndPos == buffer.length ? 
0 : bufferEndPos;
                                }
                        }
                        dataPos += freeSize;
                        // the consumer might be waiting for bytes to come in
                        synchronized (bufferMutex) {
                                bufferMutex.notify();
                        }
                }
        }
        
        public int read() throws IOException {
                if (bufferStartPos == bufferEndPos && atEOF)
                        return -1;
                synchronized (bufferMutex) {
                        // Wait till the buffer has data
                        while (!atEOF && bufferStartPos == bufferEndPos && 
!streamDemux.pumpData(this)) {
                                try {
                                        // Wait till the producer notifies us he has
                                        // put some data in the buffer.
                                        bufferMutex.wait();
                                } catch (InterruptedException e) {
                                        throw new 
InterruptedIOException(e.getMessage());
                                }
                        }
                }
                // We might break out due to EOF
                if (bufferStartPos == bufferEndPos)
                        return -1;
                // the buffer should have data now.
                byte result = buffer[bufferStartPos++];
                bufferStartPos = bufferStartPos == buffer.length ? 0 : bufferStartPos;
                // the producer might be waiting for free space in the
                // buffer, we have to notify him.
                synchronized (bufferMutex) {
                        bufferMutex.notify();
                }
                return result & 0xff;
        }
        
        public int read(byte b[], int off, int len) throws IOException {
                if (b == null) {
                        throw new NullPointerException();
                } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) 
> b.length) || ((off + len) < 0)) {
                        throw new IndexOutOfBoundsException();
                } else if (len == 0) {
                        return 0;
                }
                
                int c = read();
                if (c == -1) {
                        return -1;
                }
                b[off] = (byte) c;
                len = Math.min(available(), len);
                int i = 1;
                try {
                        for (; i < len; i++) {
                                c = read();
                                b[off + i] = (byte) c;
                        }
                } catch (IOException ee) {
                }
                return i;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/multiplexor/MultiplexorTest.java
  
  Index: MultiplexorTest.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.io.ObjectInputStream;
  import java.io.OutputStream;
  import java.io.IOException;
  import java.io.BufferedInputStream;
  import java.io.PipedOutputStream;
  import java.io.ObjectOutputStream;
  import java.io.InterruptedIOException;
  import java.io.BufferedOutputStream;
  import java.io.PipedInputStream;
  
  /**
   * This class is a unit tester of the
   * StreamMux and StreamDemux classes.
   *
   * Starts 3 concurent readers and 3 concurent
   * writers using 3 fully buffered object streams multiplexed over
   * a single Pipe(Input/Output)Stream.  The
   * writers send a 10K message with a timestamp.
   *
   * Readers display how long the message took to arrive.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class MultiplexorTest {
  
        StreamMux mux;
        StreamDemux demux;
  
        public static final int PAY_LOAD_SIZE = 1024 * 10;
        public static char[] PAY_LOAD;
  
        class WriterThread extends Thread {
                ObjectOutputStream os;
                short id;
                WriterThread(short id) throws IOException {
                        super("WriterThread");
                        this.os = new ObjectOutputStream(new 
BufferedOutputStream(mux.getStream(id)));
                        this.os.flush();
                }
                public void run() {
                        try {
                                for (int i = 0; i < 1000; i++) {
                                        os.writeLong(System.currentTimeMillis());
                                        os.writeObject(PAY_LOAD);
                                        os.flush();
                                }
                        } catch (IOException e) {
                                e.printStackTrace();
                        }
                }
        }
  
        class ReaderThread extends Thread {
                ObjectInputStream is;
                short id;
                ReaderThread(short id) throws IOException {
                        super("ReaderThread");
                        this.is = new ObjectInputStream(new 
BufferedInputStream(demux.getStream(id)));
                        this.id = id;
                }
  
                public void run() {
                        try {
                                for (int i = 0; i < 1000; i++) {
                                        long t = is.readLong();
                                        is.readObject();
                                        t = System.currentTimeMillis() - t;
                                        System.out.println("" + id + ": Packet " + i + 
" Latency : " + ((double) t / (double) 1000));
                                        System.out.flush();
                                }
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                }
        }
  
        /**
         * MuxDemuxTester constructor
         */
        public MultiplexorTest() {
                super();
  
                char s[] = new char[PAY_LOAD_SIZE];
                char c = 'A';
                for (int i = 0; i < PAY_LOAD_SIZE; i++) {
                        s[i] = c;
                        c++;
                        c = c > 'Z' ? 'A' : c;
                }
                PAY_LOAD = s;
        }
  
        public void connect() throws IOException {
                PipedInputStream pis = new PipedInputStream();
                PipedOutputStream pos = new PipedOutputStream(pis);
  
                mux = new StreamMux(pos);
                demux = new StreamDemux(pis);
        }
        
        public static void main(String args[]) throws Exception {
  
                System.out.println("Initializing");
                MultiplexorTest tester = new MultiplexorTest();
  
                System.out.println("Connecting the streams");
                tester.connect();
                System.out.println("Starting stream 1");
                tester.startStream((short) 1);
                System.out.println("Starting stream 2");
                tester.startStream((short) 2);
                System.out.println("Starting stream 3");
                tester.startStream((short) 3);
  
        }
  
        public void startStream(short id) throws IOException {
  
                new WriterThread(id).start();
                new ReaderThread(id).start();
  
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/multiplexor/MuxOutputStream.java
  
  Index: MuxOutputStream.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.io.OutputStream;
  import java.io.InterruptedIOException;
  import java.io.IOException; 
  
  /**
   * Objects of this class provide and an OutputStream
   * to a StreamMux.
   *
   * Objects of this class are created by a StreamMux object. 
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  class MuxOutputStream extends OutputStream {
        
        StreamMux streamMux;
        short streamId;
        
        byte buffer[];
        short bufferLength;
        
        MuxOutputStream(StreamMux mux, short id) {
                streamMux = mux;
                streamId = id;
                buffer = new byte[streamMux.frameSize];
                bufferLength = 0;
        }
        
        /**
         * Closes this output stream and releases any system resources.
         */
        public void close() throws IOException {
                streamMux.closeStream(streamId);
        }
        
        /**
         * Flushes this output stream and forces any buffered output bytes 
         * to be written out. 
         */
        public void flush() throws IOException {
                if (bufferLength > 0)
                        flushBuffer();
                streamMux.flush();
        }
        
        /**
         * Flushes the internal buffer to the multiplexor.
         */
        private void flushBuffer() throws IOException {
                streamMux.write(this, buffer, bufferLength);
                bufferLength = 0;
        }
  
        /**
         * 
         */
        public void write(int data) throws IOException {
                byte b = (byte) data;
                buffer[bufferLength] = b;
                bufferLength++;
  
                if (bufferLength == streamMux.frameSize) {
                        flushBuffer();
                }
        }
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/multiplexor/SocketMultiplexor.java
  
  Index: SocketMultiplexor.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.net.Socket;
  import java.io.OutputStream;
  import java.io.IOException;
  import java.io.InputStream;
  
  /**
   * Used to multiplex a socket's streams.
   *
   * With this this interface you can access the multiplexed
   * streams of the socket.  The multiplexed streams are
   * identifed a stream id.
   *
   * Stream id 0 is reserved for internal use of the multiplexor.
   * 
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SocketMultiplexor {
  
        private Socket socket;
        StreamMux mux;
        StreamDemux demux;
  
        public SocketMultiplexor(Socket s) throws IOException {
                socket = s;
                mux = new StreamMux(s.getOutputStream());
                demux = new StreamDemux(s.getInputStream());
        }
        
        /**
         * Creation date: (11/16/00 1:15:01 PM)
         * @return org.spydermq.connection.StreamDemux
         */
        public StreamDemux getDemux() {
                return demux;
        }
        
        public InputStream getInputStream(int id) throws IOException {
                return demux.getStream((short) id);
        }
  
        /**
         * Creation date: (11/16/00 1:15:01 PM)
         * @return org.spydermq.connection.StreamMux
         */
        public StreamMux getMux() {
                return mux;
        }
        
        public OutputStream getOutputStream(int id) throws IOException {
                return mux.getStream((short) id);
        }
  
        /**
         * Creation date: (11/16/00 1:14:41 PM)
         * @return java.net.Socket
         */
        public java.net.Socket getSocket() {
                return socket;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/multiplexor/StreamMux.java
  
  Index: StreamMux.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.io.*;
  import java.util.*;
  
  /**
   * This class is used to multiplex from
   * multiple streams into a single stream.
   * 
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class StreamMux {
  
        short frameSize = 512;
        HashMap openStreams = new HashMap();
        OutputStream out;
        DataOutputStream objectOut;
  
        // Commands that can be sent over the admin stream.
        static final byte OPEN_STREAM_COMMAND = 0;
        static final byte CLOSE_STREAM_COMMAND = 1;
        static final byte NEXT_FRAME_SHORT_COMMAND = 2;
  
        /**
         * StreamMux constructor comment.
         * @param out java.io.OutputStream
         */
        public StreamMux(OutputStream out) throws IOException {
                this.out = out;
                this.objectOut = new DataOutputStream(out);
        }
  
        void closeStream(short id) throws IOException {
                if (id == 0)
                        throw new IOException("Stream id 0 is reserved for internal 
use.");
  
                MuxOutputStream s;
  
                synchronized (openStreams) {
                        s = (MuxOutputStream) openStreams.remove(new Short(id));
                }
  
                synchronized (objectOut) {
                        objectOut.writeShort(0); // admin stream
                        objectOut.writeByte(CLOSE_STREAM_COMMAND); // command
                        objectOut.writeShort(id); // argument
                }
        }
  
        public void flush() throws IOException {
  
                synchronized (objectOut) {
                        objectOut.flush();
                        out.flush();
                }
  
        }
  
        /**
         * Insert the method's description here.
         * Creation date: (11/15/00 5:30:55 PM)
         * @return short
         */
        public short getFrameSize() {
                synchronized (openStreams) {
                        return frameSize;
                }
        }
  
        public OutputStream getStream(short id) throws IOException {
                if (id == 0)
                        throw new IOException("Stream id 0 is reserved for internal 
use.");
  
                OutputStream s;
                synchronized (openStreams) {
                        s = (OutputStream) openStreams.get(new Short(id));
                        if (s != null) {
                                return s;
                        }
  
                        s = new MuxOutputStream(this, id);
                        openStreams.put(new Short(id), s);
                }
  
                synchronized (objectOut) {
                        objectOut.writeShort(0); // admin stream
                        objectOut.writeByte(OPEN_STREAM_COMMAND); // command
                        objectOut.writeShort(id); // argument
                }
                return s;
        }
  
        /**
         * Insert the method's description here.
         * Creation date: (11/15/00 5:30:55 PM)
         * @param newFrameSize short
         */
        public void setFrameSize(short newFrameSize) throws IOException {
                synchronized (openStreams) {
                        if (openStreams.size() > 0)
                                throw new IOException("Cannot change the frame size 
while there are open streams.");
                        frameSize = newFrameSize;
                }
        }
  
        void write(MuxOutputStream s, byte b[], int len) throws IOException {
  
                if (b == null) {
                        throw new NullPointerException();
                } else if ((len < 0) || (len > b.length) || (len > frameSize)) {
                        throw new IndexOutOfBoundsException();
                } else if (len == 0) {
                        return;
                }
  
                synchronized (objectOut) {
                        if (len < frameSize) {
                                objectOut.writeShort(0);
                                objectOut.writeByte(NEXT_FRAME_SHORT_COMMAND);
                                objectOut.writeShort(len);
                        }
                        objectOut.writeShort(s.streamId);
                        objectOut.write(b, 0, len);
                }
        }
  }
  
  
  

Reply via email to