Author: gsim
Date: Mon Dec 10 22:14:29 2012
New Revision: 1419837

URL: http://svn.apache.org/viewvc?rev=1419837&view=rev
Log:
PROTON-118: align driver more closely with c equivalent

Modified:
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
 Mon Dec 10 22:14:29 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.driver;
 
+import java.io.IOException;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sasl;
 
@@ -36,7 +37,7 @@ public interface Connector<C>
      * the connector.
      *
      */
-    void process();
+    void process() throws IOException;
 
     /** Access the listener which opened this connector.
      *

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
 Mon Dec 10 22:14:29 2012
@@ -51,7 +51,7 @@ public interface Driver
      * @param timeout maximum time in milliseconds to wait.
      *                0 means infinite wait
      */
-    void doWait(int timeout);
+    void doWait(long timeout);
 
     /**
      * Get the next listener with pending data in the driver.
@@ -120,4 +120,14 @@ public interface Driver
      * @return a new connector to the given host:port, NULL if error.
      */
     <C> Connector<C> createConnector(SelectableChannel fd, C context);
+
+    /**
+     * Return an iterator over all listeners.
+     */
+    Iterable<Listener> listeners();
+    /**
+     * Return an iterator over all connectors.
+     */
+    Iterable<Connector> connectors();
+
 }

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
 Mon Dec 10 22:14:29 2012
@@ -49,13 +49,5 @@ public interface Listener<C>
      * Close the socket used by the listener.
      *
      */
-    void close();
-
-    /**
-     * Destructor for the given listener.
-     *
-     * Assumes the listener's socket has been closed prior to call.
-     *
-     */
-    void destroy();
+    void close() throws java.io.IOException;
 }

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 Mon Dec 10 22:14:29 2012
@@ -29,13 +29,11 @@ import org.apache.qpid.proton.driver.Con
 import org.apache.qpid.proton.driver.Listener;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sasl.SaslState;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.TransportFactory;
 
 class ConnectorImpl<C> implements Connector<C>
 {
-    public static int END_OF_STREAM = -1;
     private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
     private static int readBufferSize = Integer.getInteger
         ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
@@ -44,7 +42,6 @@ class ConnectorImpl<C> implements Connec
 
     enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
 
-    private final Sasl _sasl;
     private final DriverImpl _driver;
     private final Listener<C> _listener;
     private final SocketChannel _channel;
@@ -52,218 +49,111 @@ class ConnectorImpl<C> implements Connec
     private C _context;
 
     private Connection _connection;
+    private Transport _transport = null;
     private SelectionKey _key;
     private ConnectorState _state = UNINITIALIZED;
 
     private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
-    private int _bytesNotRead = 0;
-
-    private int _bytesNotWritten = 0;
     private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
-    private Transport _transport = null;
+    private boolean _readPending;
 
-    ConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl, 
SocketChannel c, C context, SelectionKey key)
+    ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C 
context, SelectionKey key)
     {
         _driver = driver;
         _listener = listener;
         _channel = c;
-        _sasl = sasl;
         _context = context;
         _key = key;
     }
 
-    public void process()
+    void selected()
     {
-        if (_channel.isConnectionPending())
-        {
-            try
-            {
-                _channel.finishConnect();
-            }
-            catch (IOException io)
-            {
-                throw new RuntimeException("Exception will trying to complete 
connection",io);
-            }
-        }
-
-        if (!_channel.isOpen())
-        {
-            _state = ConnectorState.CLOSED;
-            return;
-        }
-
-        if (_key.isReadable())
-        {
-            read();
-        }
-        write();
+        _readPending = true;
     }
 
-    void read()
+    public void process() throws IOException
     {
-        try
+        if (_channel.isOpen() && _channel.finishConnect())
         {
-            int  bytesRead = _channel.read(_readBuffer);
-            int consumed = 0;
-            while (bytesRead > 0)
-            {
-                consumed = processInput(_readBuffer.array(), 0, bytesRead + 
_bytesNotRead);
-                if (consumed < bytesRead)
-                {
-                    _readBuffer.compact();
-                    _bytesNotRead = bytesRead - consumed;
-                }
-                else
-                {
-                    _readBuffer.rewind();
-                    _bytesNotRead = 0;
-                }
-                bytesRead = _channel.read(_readBuffer);
-            }
-            if (bytesRead == -1)
+            if (_readPending)
             {
-                _state = ConnectorState.EOS;
+                read();
+                _readPending = false;
+                if (isClosed()) return;
             }
-        }
-        catch (IOException e)
-        {
-            _logger.log(Level.SEVERE, "Exception when reading from 
connection",e);
+            write();
         }
     }
 
-    void write()
+    void read() throws IOException
     {
-        try
+        int bytesRead = 0;
+        while ((bytesRead = _channel.read(_readBuffer)) > 0)
         {
-            processOutput();
-            if (_bytesNotWritten > 0)
+            _readBuffer.flip();
+            int consumed = _transport.input(_readBuffer.array(), 
_readBuffer.position(), _readBuffer.limit());
+            _readBuffer.position(consumed == Transport.END_OF_STREAM ? 
_readBuffer.limit() : consumed);
+            if (_logger.isLoggable(Level.FINE))
             {
-                _writeBuffer.limit(_bytesNotWritten);
-                int written = _channel.write(_writeBuffer);
-                if (_writeBuffer.hasRemaining())
-                {
-                    _writeBuffer.compact();
-                    _bytesNotWritten = _bytesNotWritten - written;
-                }
-                else
-                {
-                    _writeBuffer.clear();
-                    _bytesNotWritten = 0;
-                }
-                if (_bytesNotWritten > 0) // couldn't write all the data, need 
to know when we could write again.
-                {
-                    _key.interestOps(_key.interestOps() | 
SelectionKey.OP_WRITE);
-                }
-                else if ((_key.interestOps() & SelectionKey.OP_WRITE) != 0)
-                {
-                    _key.interestOps(_key.interestOps() & 
~SelectionKey.OP_WRITE);                    
-                }
+                _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + 
_readBuffer.remaining() + " available");
             }
+            _readBuffer.compact();
         }
-        catch (IOException e)
-        {
-            _logger.log(Level.SEVERE, "Exception when writing to 
connection",e);
+        if (bytesRead == -1) {
+            close();
         }
     }
 
-    int processInput(byte[] bytes, int offset, int size)
+    void write() throws IOException
     {
-        int read = 0;
-        while (read < size)
+        int interest = _key.interestOps();
+        int start = _writeBuffer.position();
+        boolean done = false;
+        while (!done)
         {
-            offset = read;
-            switch (_state)
+            int produced = _transport.output(_writeBuffer.array(), 
_writeBuffer.position(), _writeBuffer.remaining());
+            _writeBuffer.position(_writeBuffer.position() + produced);
+            _writeBuffer.flip();
+            int wrote = _channel.write(_writeBuffer);
+            if (_logger.isLoggable(Level.FINE))
             {
-            case UNINITIALIZED:
-                read += readSasl(bytes, offset, size - offset);
-                if (isSaslDone())
-                {
-                    _state = _sasl.getState() == SaslState.PN_SASL_PASS ? 
ConnectorState.OPENED : ConnectorState.CLOSED;
-                }
-                break;
-            case OPENED:
-                read += readAMQPCommands(bytes, offset, size - offset);
-                break;
-            case EOS:
-            case CLOSED:
-                break;
+                _logger.log(Level.FINE, "wrote " + wrote + " bytes, " + 
_writeBuffer.remaining() + " remaining");
             }
-        }
-        return read;
-    }
-
-    void processOutput()
-    {
-        switch (_state)
-        {
-        case UNINITIALIZED:
-            writeSasl();
-            if (isSaslDone())
+            _writeBuffer.compact();
+            if (_writeBuffer.position() > 0)
             {
-                _state = _sasl.getState() == SaslState.PN_SASL_PASS ? 
ConnectorState.OPENED : ConnectorState.CLOSED;
+                //weren't able to write all available data, ask to be notfied 
when we can write again
+                interest |= SelectionKey.OP_WRITE;
+                done = true;
+            }
+            else
+            {
+                //we are done if buffer was empty to begin with and we did not 
produce enough to fill it
+                interest &= ~SelectionKey.OP_WRITE;
+                done = start == 0 && produced < _writeBuffer.capacity();
+                start = 0;
             }
-            break;
-        case OPENED:
-            writeAMQPCommands();
-            break;
-        case EOS:
-            writeAMQPCommands();
-        case CLOSED:  // not a valid option
-            //TODO
-            break;
-        }
-    }
-
-    int readAMQPCommands(byte[] bytes, int offset, int size)
-    {
-        int consumed = _transport.input(bytes, offset, size);
-        if (consumed == END_OF_STREAM)
-        {
-            return size;
-        }
-        else
-        {
-            return consumed;
         }
+        _key.interestOps(interest);
     }
 
-    void writeAMQPCommands()
+    public Listener<C> listener()
     {
-        int size = _writeBuffer.array().length - _bytesNotWritten;
-        _bytesNotWritten += _transport.output(_writeBuffer.array(),
-                _bytesNotWritten, size);
+        return _listener;
     }
 
-    int readSasl(byte[] bytes, int offset, int size)
+    public Sasl sasl()
     {
-        int consumed = _sasl.input(bytes, offset, size);
-        if (consumed == END_OF_STREAM)
+        if (_transport != null)
         {
-            return size;
+            return _transport.sasl();
         }
         else
         {
-            return consumed;
+            return null;
         }
     }
 
-    void writeSasl()
-    {
-        int size = _writeBuffer.array().length - _bytesNotWritten;
-        _bytesNotWritten += _sasl.output(_writeBuffer.array(),
-                _bytesNotWritten, size);
-    }
-
-    public Listener<C> listener()
-    {
-        return _listener;
-    }
-
-    public Sasl sasl()
-    {
-        return _sasl;
-    }
-
     public Connection getConnection()
     {
         return _connection;
@@ -287,44 +177,29 @@ class ConnectorImpl<C> implements Connec
 
     public void close()
     {
-        if (_state == ConnectorState.CLOSED)
+        if (!isClosed())
         {
-            return;
-        }
-
-        try
-        {
-            // If the connection was closed due to authentication error
-            // then there might be data available to write on to the wire.
-            writeSasl();
-            writeAMQPCommands(); // write any closing commands
-            _channel.close();
-            _state = ConnectorState.CLOSED;
-        }
-        catch (IOException e)
-        {
-            _logger.log(Level.SEVERE, "Exception when closing connection",e);
+            try
+            {
+                write();
+                _channel.close();
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.SEVERE, "Exception when closing 
connection",e);
+            }
         }
     }
 
     public boolean isClosed()
     {
-        return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
+        boolean result = !(_channel.isOpen() && _channel.isConnected());
+        return result;
     }
 
     public void destroy()
     {
         close(); // close if not closed already
-    }
-
-    private void setState(ConnectorState newState)
-    {
-        _state = newState;
-    }
-
-    private boolean isSaslDone()
-    {
-        SaslState state = _sasl.getState();
-        return state == SaslState.PN_SASL_PASS || state == 
SaslState.PN_SASL_FAIL;
+        _driver.removeConnector(this);
     }
 }

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 Mon Dec 10 22:14:29 2012
@@ -29,8 +29,10 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -38,12 +40,13 @@ import java.util.logging.Logger;
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
 import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.impl.SaslImpl;
 
 public class DriverImpl implements Driver
 {
     private Selector _selector;
     private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+    private Collection<Listener> _listeners = new LinkedList();
+    private Collection<Connector> _connectors = new LinkedList();
     private Logger _logger = Logger.getLogger("proton.driver");
 
     public DriverImpl() throws IOException
@@ -56,7 +59,7 @@ public class DriverImpl implements Drive
         _selector.wakeup();
     }
 
-    public void doWait(int timeout)
+    public void doWait(long timeout)
     {
         try
         {
@@ -146,8 +149,9 @@ public class DriverImpl implements Drive
             selectedIter.remove();
             if(key.isReadable() || key.isWritable())
             {
-                return (Connector) key.attachment();
-
+                ConnectorImpl c = (ConnectorImpl) key.attachment();
+                c.selected();
+                return c;
             }
         }
         return null;
@@ -165,6 +169,8 @@ public class DriverImpl implements Drive
             _logger.log(Level.SEVERE, "Exception when closing selector",e);
             throw new RuntimeException(e);
         }
+        _listeners.clear();
+        _connectors.clear();
     }
 
     public <C> Listener<C> createListener(String host, int port, C context)
@@ -193,6 +199,7 @@ public class DriverImpl implements Drive
         Listener<C> l = new ListenerImpl<C>(this, c, context);
         SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
         key.attach(l);
+        _listeners.add(l);
         return l;
     }
 
@@ -215,21 +222,34 @@ public class DriverImpl implements Drive
 
     public <C> Connector<C> createConnector(SelectableChannel c, C context)
     {
-        SelectionKey key = registerInterest(c,SelectionKey.OP_READ);
-        SaslImpl sasl = new SaslImpl();
-        sasl.client();
-        Connector<C> co = new ConnectorImpl<C>(this, null, 
sasl,(SocketChannel)c, context, key);
+        SelectionKey key = registerInterest(c, SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+        Connector<C> co = new ConnectorImpl<C>(this, null, (SocketChannel)c, 
context, key);
         key.attach(co);
+        _connectors.add(co);
         return co;
     }
 
+    public <C> void removeConnector(Connector<C> c)
+    {
+        _connectors.remove(c);
+    }
+
+    public Iterable<Listener> listeners()
+    {
+        return _listeners;
+    }
+
+    public Iterable<Connector> connectors()
+    {
+        return _connectors;
+    }
+
     protected <C> Connector<C> createServerConnector(SelectableChannel c, C 
context, Listener<C> l)
     {
-        SelectionKey key = registerInterest(c,SelectionKey.OP_READ);
-        SaslImpl sasl = new SaslImpl();
-        sasl.server();
-        Connector<C> co = new ConnectorImpl<C>(this, l, sasl,(SocketChannel)c, 
context, key);
+        SelectionKey key = registerInterest(c, SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+        Connector<C> co = new ConnectorImpl<C>(this, l, (SocketChannel)c, 
context, key);
         key.attach(co);
+        _connectors.add(co);
         return co;
     }
 

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
 Mon Dec 10 22:14:29 2012
@@ -66,20 +66,8 @@ class ListenerImpl<C> implements Listene
         return _context;
     }
 
-    public void close()
+    public void close() throws IOException
     {
-        try
-        {
-            _channel.close();
-        }
-        catch (IOException e)
-        {
-            _logger.log(Level.SEVERE, "Exception when closing listener",e);
-        }
-    }
-
-    public void destroy()
-    {
-        close();
+        _channel.socket().close();
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to