Author: jvermillard
Date: Wed Jan 10 00:13:43 2007
New Revision: 494760

URL: http://svn.apache.org/viewvc?view=rev&rev=494760
Log:
first version with 1 read  and 1 write thread per serial connection

Modified:
    
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
    
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
    
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
    
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java

Modified: 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
 (original)
+++ 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
 Wed Jan 10 00:13:43 2007
@@ -5,14 +5,16 @@
 import java.util.Enumeration;
 import java.util.TooManyListenersException;
 
-import javax.comm.CommPortIdentifier;
-import javax.comm.PortInUseException;
-import javax.comm.SerialPort;
-import javax.comm.UnsupportedCommOperationException;
+import gnu.io.CommPortIdentifier;
+import gnu.io.PortInUseException;
+import gnu.io.SerialPort;
+import gnu.io.UnsupportedCommOperationException;
 
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,13 @@
        protected Class<? extends IoSessionConfig> getSessionConfigType() {
                return SerialSessionConfig.class;
        }
+       
+       @Override       
+    protected IoServiceListenerSupport getListeners()
+    {
+        return super.getListeners();
+    }
+
 
        @Override
        protected ConnectFuture doConnect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
@@ -74,28 +83,35 @@
        
                                                // TODO : receive threshold
                                                //      
serialPort.enableReceiveThreshold(receiveThreshold);  /* bytes */
-                                      // TODO : reveive Timeout 
serialPort.enableReceiveTimeout(10); /* milliseconds */
-
-                                               // TODO : create serial session
-                                               outputStream = serialPort
-                                                               
.getOutputStream();
-                                               inputStream = 
serialPort.getInputStream();
+                                       // TODO : reveive Timeout 
serialPort.enableReceiveTimeout(10); /* milliseconds */
 
                                                
serialPort.notifyOnDataAvailable(true);
-                                               
serialPort.addEventListener(this);
                                                
+                                               ConnectFuture future = new 
DefaultConnectFuture();
+                                               SerialSession session = new 
SerialSession(this,portAddress,serialPort);
+                                               session.start();
+                                               future.setSession( session );
+                               return future;
                                        } catch (PortInUseException e) {
-                                               e.printStackTrace();
-                                       } catch 
(UnsupportedCommOperationException e1) {
-                                               e1.printStackTrace();
+                                               if(log.isDebugEnabled())
+                                                       log.debug("Port In Use 
Exception : ",e);
+                                               return 
DefaultConnectFuture.newFailedFuture(e);                                        
 
+                                       } catch 
(UnsupportedCommOperationException e) {
+                                               if(log.isDebugEnabled())
+                                                       log.debug("Comm 
Exception : ",e);
+                                               return 
DefaultConnectFuture.newFailedFuture(e);
                                        } catch (IOException e) {
-                                               e.printStackTrace();
+                                               if(log.isDebugEnabled())
+                                                       log.debug("IOException 
: ",e);
+                                               return 
DefaultConnectFuture.newFailedFuture(e);
                                        } catch (TooManyListenersException e) {
-                                               e.printStackTrace();
+                                               if(log.isDebugEnabled())
+                                                       
log.debug("TooManyListenersException : ",e);
+                                               return 
DefaultConnectFuture.newFailedFuture(e);
                                        }
                                }
                        }
                }
-               return null;
+               return DefaultConnectFuture.newFailedFuture(new 
RuntimeException("Serial port not found"));
        }
-}
+}
\ No newline at end of file

Modified: 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
 (original)
+++ 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
 Wed Jan 10 00:13:43 2007
@@ -1,5 +1,8 @@
 package org.apache.mina.transport.serial;
 
+import java.util.Queue;
+
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -12,14 +15,27 @@
 
        @Override
        protected void doClose(IoSession session) throws Exception {
-               // TODO Auto-generated method stub
-               
+               ((SerialSession)session).closeSerialPort();
        }
 
        @Override
        protected void doWrite(IoSession session, WriteRequest writeRequest) 
throws Exception {
-               // TODO Auto-generated method stub
-               
+               SerialSession s=(SerialSession)session;
+               Queue<WriteRequest> queue = s.getWriteRequestQueue();
+
+               ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+        // SocketIoProcessor.doFlush() will reset it after write is finished
+        // because the buffer will be passed with messageSent event. 
+        ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+        synchronized( queue )
+        {
+            queue.offer( writeRequest );
+            if( queue.size() == 1 && session.getTrafficMask().isWritable() )
+            {
+                // Notify serial session worker only when writeRequestQueue 
was empty.
+                s.notifyWriteWorker();
+            }
+        }
        }
 
 }

Modified: 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
 (original)
+++ 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
 Wed Jan 10 00:13:43 2007
@@ -24,7 +24,6 @@
 import java.security.InvalidParameterException;
 
 import javax.comm.SerialPort;
-import javax.naming.directory.InvalidAttributesException;
 
 public class SerialPortAddress extends SocketAddress {
        
@@ -109,13 +108,6 @@
        public String toString() {
                return 
"serial("+name+",bauds:"+bauds+",databits:"+dataBits+",stopbits:"+stopBits+",parity:"+parity+",flowcontrol:"+flowControl+")";
        }
-       
-       public static void main(String[] args) {
-               SerialPortAddress addy=new 
SerialPortAddress("/dev/ttyS0",9600,DataBits.DATABITS_8,StopBits.STOP_BITS_1,Parity.PARITY_NONE,FlowControl.FLOWCONTROL_NONE);
-               System.err.println("serial : "+addy);
-       
-       }
-       
        
        int getDataBitsForRXTX()
        {

Modified: 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
 (original)
+++ 
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
 Wed Jan 10 00:13:43 2007
@@ -1,9 +1,19 @@
 package org.apache.mina.transport.serial;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.SocketAddress;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.TooManyListenersException;
 
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -11,28 +21,46 @@
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SerialSession extends BaseIoSession {
+public class SerialSession extends BaseIoSession implements
+               SerialPortEventListener {
 
        private SerialSessionConfig config;
+
        private IoHandler ioHandler;
+
        private IoFilterChain filterChain;
+
        private IoService service;
+
        private SerialPortAddress address;
-    private final Queue<WriteRequest> writeRequestQueue;
-       
-       public SerialSession(IoService service, SerialPortAddress address) 
-       {
+
+       private final Queue<WriteRequest> writeRequestQueue;
+
+       private InputStream inputStream;
+
+       private OutputStream outputStream;
+
+       private SerialPort port;
+
+       private Logger log;
+
+       SerialSession(IoService service, SerialPortAddress address, SerialPort 
port) {
                this.service = service;
                this.ioHandler = service.getHandler();
-               this.filterChain = new SerialFilterChain( this );
+               this.filterChain = new SerialFilterChain(this);
                this.writeRequestQueue = new LinkedList<WriteRequest>();
+               this.port = port;
+
+               log = LoggerFactory.getLogger(SerialSession.class);
        }
-       
+
        @Override
        protected void updateTrafficMask() {
                // TODO Auto-generated method stub
-               
+
        }
 
        public IoSessionConfig getConfig() {
@@ -55,14 +83,27 @@
                return address;
        }
 
-       public int getScheduledWriteBytes() {
-               // TODO Auto-generated method stub
-               return 0;
+       Queue<WriteRequest> getWriteRequestQueue() {
+               return writeRequestQueue;
        }
 
        public int getScheduledWriteMessages() {
-               // TODO Auto-generated method stub
-               return 0;
+               synchronized (writeRequestQueue) {
+                       return writeRequestQueue.size();
+               }
+       }
+
+       public int getScheduledWriteBytes() {
+               int size = 0;
+               synchronized (writeRequestQueue) {
+                       for (Object o : writeRequestQueue) {
+                               if (o instanceof ByteBuffer) {
+                                       size += ((ByteBuffer) o).remaining();
+                               }
+                       }
+               }
+
+               return size;
        }
 
        public IoService getService() {
@@ -71,5 +112,148 @@
 
        public TransportType getTransportType() {
                return TransportType.getInstance("SERIAL");
+       }
+
+       protected void close0()
+    {
+        filterChain.fireFilterClose( this );
+    }
+       
+       /**
+        * start handling streams
+        * 
+        * @throws IOException
+        * @throws TooManyListenersException
+        */
+       void start() throws IOException, TooManyListenersException {
+               inputStream = port.getInputStream();
+               outputStream = port.getOutputStream();
+               ReadWorker w = new ReadWorker();
+               w.start();
+               port.addEventListener(this);
+               
((SerialConnector)getService()).getListeners().fireSessionCreated(this);
+       }
+
+       private Object writeMonitor = new Object();
+
+       private WriteWorker writeWorker;
+
+       private class WriteWorker extends Thread {
+               public void run() {
+                       while (isConnected() && !isClosing()) {
+                               flushWrites();
+
+                               // wait for more data
+                               try {
+                                       writeMonitor.wait();
+                               } catch (InterruptedException e) {
+                                       log.error("InterruptedException", e);
+                               }
+                       }
+               }
+       }
+
+       private void flushWrites() {
+               for (;;) {
+                       WriteRequest req;
+
+                       synchronized (writeRequestQueue) {
+                               req = (WriteRequest) writeRequestQueue.peek();
+                       }
+
+                       if (req == null)
+                               break;
+
+                       ByteBuffer buf = (ByteBuffer) req.getMessage();
+                       if (buf.remaining() == 0) {
+                               synchronized (writeRequestQueue) {
+                                       writeRequestQueue.poll();
+                               }
+                               this.increaseWrittenMessages();
+
+                               buf.reset();
+
+                               this.getFilterChain().fireMessageSent(this, 
req);
+                               continue;
+                       }
+
+                       int writtenBytes = buf.remaining();
+                       try {
+                               outputStream.write(buf.array());
+                               this.increaseWrittenBytes(writtenBytes);
+                       } catch (IOException e) {
+                               this.getFilterChain().fireExceptionCaught(this, 
e);
+                       }
+               }
+       }
+
+       void notifyWriteWorker() {
+               if (writeWorker == null) {
+                       writeWorker = new WriteWorker();
+                       writeWorker.start();
+               } else {
+                       synchronized (writeMonitor) {
+                               writeMonitor.notifyAll();
+                       }
+               }
+       }
+
+       private Object readReadyMonitor = new Object();
+
+       private class ReadWorker extends Thread {
+               @Override
+               public void run() {
+                       while (isConnected() && !isClosing()) {
+                               synchronized (readReadyMonitor) {
+                                       try {
+                                               readReadyMonitor.wait();
+                                       } catch (InterruptedException e) {
+                                               
log.error("InterruptedException", e);
+                                       }
+                                       int dataSize;
+                                       try {
+                                               dataSize = 
inputStream.available();
+                                               byte[] data = new 
byte[dataSize];
+                                               int readBytes = 
inputStream.read(data);
+
+                                   if( readBytes > 0 )
+                                   {
+                                       increaseReadBytes( readBytes );
+                                       // TODO : check if it's the good 
allocation way
+                                       ByteBuffer buf = ByteBuffer.allocate( 
readBytes );
+                                       buf.put(data,0,readBytes);
+                                       getFilterChain().fireMessageReceived( 
SerialSession.this, buf );
+                                   }
+                                       } catch (IOException e) {
+                                               
getFilterChain().fireExceptionCaught(
+                                                               
SerialSession.this, e);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       public void serialEvent(SerialPortEvent evt) {
+               if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
+                       synchronized (readReadyMonitor) {
+                               readReadyMonitor.notifyAll();
+                       }
+               }
+       }
+
+       public void closeSerialPort() {
+               try {
+                       inputStream.close();
+               } catch (IOException e) {
+                       ExceptionMonitor.getInstance().exceptionCaught(e);
+               }
+               try {
+                       outputStream.close();
+               } catch (IOException e) {
+                       ExceptionMonitor.getInstance().exceptionCaught(e);
+               }
+               
+               port.close();
+               
((SerialConnector)getService()).getListeners().fireSessionDestroyed(this);
        }
 }


Reply via email to