Author: rajith
Date: Wed Oct 10 17:18:36 2012
New Revision: 1396697

URL: http://svn.apache.org/viewvc?rev=1396697&view=rev
Log:
PROTON-66 Adding the driver implementation. This is based on the work I
have done in my branch with minor modifications to adjust for API
changes since then.

Added:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Removed:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
Modified:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java

Added: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1396697&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 (added)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
 Wed Oct 10 17:18:36 2012
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.driver.impl;
+
+import static 
org.apache.qpid.proton.driver.impl.ConnectorImpl.ConnectorState.UNINITIALIZED;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.TransportFactory;
+
+class ConnectorImpl<C> implements Connector<C>
+{
+    public static int END_OF_STREAM = -1;
+    private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private static int readBufferSize = Integer.getInteger
+        ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
+    private static int writeBufferSize = Integer.getInteger
+        ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
+
+    enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
+
+    private final Sasl _sasl;
+    private final DriverImpl _driver;
+    private final Listener<C> _listener;
+    private final SocketChannel _channel;
+    private final Logger _logger = Logger.getLogger("proton.driver");
+    private C _context;
+
+    private Connection _connection;
+    private SelectionKey _key;
+    private ConnectorState _state = UNINITIALIZED;
+
+    private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
+    private int _bytesNotRead = 0;
+
+    private int _bytesNotWritten = 0;
+    private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
+    private Transport _transport = null;
+
+    ConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl, 
SocketChannel c, C context, SelectionKey key)
+    {
+        _driver = driver;
+        _listener = listener;
+        _channel = c;
+        _sasl = sasl;
+        _context = context;
+        _key = key;
+    }
+
+    public void process()
+    {
+        if (_channel.isConnectionPending())
+        {
+            try
+            {
+                _channel.finishConnect();
+            }
+            catch (IOException io)
+            {
+                throw new RuntimeException("Exception will trying to complete 
connection",io);
+            }
+        }
+
+        if (!_channel.isOpen())
+        {
+            _state = ConnectorState.CLOSED;
+            return;
+        }
+
+        if (_key.isReadable())
+        {
+            read();
+        }
+
+        if (_key.isWritable())
+        {
+            write();
+        }
+    }
+
+    void read()
+    {
+        try
+        {
+            int  bytesRead = _channel.read(_readBuffer);
+            int consumed = 0;
+            while (bytesRead > 0)
+            {
+                consumed = processInput(_readBuffer.array(), 0, bytesRead + 
_bytesNotRead);
+                if (consumed < bytesRead)
+                {
+                    _readBuffer.compact();
+                    _bytesNotRead = bytesRead - consumed;
+                }
+                else
+                {
+                    _readBuffer.rewind();
+                    _bytesNotRead = 0;
+                }
+                bytesRead = _channel.read(_readBuffer);
+            }
+            if (bytesRead == -1)
+            {
+                _state = ConnectorState.EOS;
+            }
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when reading from 
connection",e);
+        }
+    }
+
+    void write()
+    {
+        try
+        {
+            processOutput();
+            if (_bytesNotWritten > 0)
+            {
+                _writeBuffer.limit(_bytesNotWritten);
+                int written = _channel.write(_writeBuffer);
+                if (_writeBuffer.hasRemaining())
+                {
+                    _writeBuffer.compact();
+                    _bytesNotWritten = _bytesNotWritten - written;
+                }
+                else
+                {
+                    _writeBuffer.clear();
+                    _bytesNotWritten = 0;
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when writing to 
connection",e);
+        }
+    }
+
+    int processInput(byte[] bytes, int offset, int size)
+    {
+        int read = 0;
+        while (read < size)
+        {
+            offset = read;
+            switch (_state)
+            {
+            case UNINITIALIZED:
+                read += readSasl(bytes, offset, size - offset);
+                if (isSaslDone())
+                {
+                    _state = _sasl.getState() == SaslState.PN_SASL_PASS ? 
ConnectorState.OPENED : ConnectorState.CLOSED;
+                }
+                break;
+            case OPENED:
+                read += readAMQPCommands(bytes, offset, size - offset);
+                break;
+            case EOS:
+            case CLOSED:
+                break;
+            }
+        }
+        return read;
+    }
+
+    void processOutput()
+    {
+        switch (_state)
+        {
+        case UNINITIALIZED:
+            writeSasl();
+            if (isSaslDone())
+            {
+                _state = _sasl.getState() == SaslState.PN_SASL_PASS ? 
ConnectorState.OPENED : ConnectorState.CLOSED;
+            }
+            break;
+        case OPENED:
+            writeAMQPCommands();
+            break;
+        case EOS:
+            writeAMQPCommands();
+        case CLOSED:  // not a valid option
+            //TODO
+            break;
+        }
+    }
+
+    int readAMQPCommands(byte[] bytes, int offset, int size)
+    {
+        int consumed = _transport.input(bytes, offset, size);
+        if (consumed == END_OF_STREAM)
+        {
+            return size;
+        }
+        else
+        {
+            return consumed;
+        }
+    }
+
+    void writeAMQPCommands()
+    {
+        int size = _writeBuffer.array().length - _bytesNotWritten;
+        _bytesNotWritten += _transport.output(_writeBuffer.array(),
+                _bytesNotWritten, size);
+    }
+
+    int readSasl(byte[] bytes, int offset, int size)
+    {
+        int consumed = _sasl.input(bytes, offset, size);
+        if (consumed == END_OF_STREAM)
+        {
+            return size;
+        }
+        else
+        {
+            return consumed;
+        }
+    }
+
+    void writeSasl()
+    {
+        int size = _writeBuffer.array().length - _bytesNotWritten;
+        _bytesNotWritten += _sasl.output(_writeBuffer.array(),
+                _bytesNotWritten, size);
+    }
+
+    public Listener<C> listener()
+    {
+        return _listener;
+    }
+
+    public Sasl sasl()
+    {
+        return _sasl;
+    }
+
+    public Connection getConnection()
+    {
+        return _connection;
+    }
+
+    public void setConnection(Connection connection)
+    {
+        _connection = connection;
+        _transport = 
TransportFactory.getDefaultTransportFactory().transport(_connection);
+    }
+
+    public C getContext()
+    {
+        return _context;
+    }
+
+    public void setContext(C context)
+    {
+        _context = context;
+    }
+
+    public void close()
+    {
+        if (_state == ConnectorState.CLOSED)
+        {
+            return;
+        }
+
+        try
+        {
+            // If the connection was closed due to authentication error
+            // then there might be data available to write on to the wire.
+            writeSasl();
+            writeAMQPCommands(); // write any closing commands
+            _channel.close();
+            _state = ConnectorState.CLOSED;
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when closing connection",e);
+        }
+    }
+
+    public boolean isClosed()
+    {
+        return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
+    }
+
+    public void destroy()
+    {
+        close(); // close if not closed already
+    }
+
+    private void setState(ConnectorState newState)
+    {
+        _state = newState;
+    }
+
+    private boolean isSaslDone()
+    {
+        SaslState state = _sasl.getState();
+        return state == SaslState.PN_SASL_PASS || state == 
SaslState.PN_SASL_FAIL;
+    }
+}

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1396697&r1=1396696&r2=1396697&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 Wed Oct 10 17:18:36 2012
@@ -1,20 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.nio.channels.*;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
 import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.impl.SaslClientImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
 
 public class DriverImpl implements Driver
 {
     private Selector _selector;
     private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+    private Logger _logger = Logger.getLogger("proton.driver");
 
     public DriverImpl() throws IOException
     {
@@ -35,10 +66,12 @@ public class DriverImpl implements Drive
         }
         catch (IOException e)
         {
-            e.printStackTrace();  // TODO - Implement
+            _logger.log(Level.SEVERE, "Exception when waiting for IO Event",e);
+            throw new RuntimeException(e);
         }
     }
 
+    @SuppressWarnings("rawtypes")
     public Listener listener()
     {
         Listener listener = null;
@@ -51,7 +84,8 @@ public class DriverImpl implements Drive
             }
             catch (IOException e)
             {
-                e.printStackTrace();  // TODO - Implement
+                _logger.log(Level.SEVERE, "Exception when selecting",e);
+                throw new RuntimeException(e);
             }
             listener = getFirstListener();
         }
@@ -64,6 +98,7 @@ public class DriverImpl implements Drive
         _selectedKeys = _selector.selectedKeys();
     }
 
+    @SuppressWarnings("rawtypes")
     private Listener getFirstListener()
     {
         Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -74,14 +109,13 @@ public class DriverImpl implements Drive
             selectedIter.remove();
             if(key.isAcceptable())
             {
-                selectedIter.remove();
                 return (Listener) key.attachment();
-
             }
         }
         return null;
     }
 
+    @SuppressWarnings("rawtypes")
     public Connector connector()
     {
         Connector connector = null;
@@ -94,13 +128,15 @@ public class DriverImpl implements Drive
             }
             catch (IOException e)
             {
-                e.printStackTrace();  // TODO - Implement
+                _logger.log(Level.SEVERE, "Exception when selecting",e);
+                throw new RuntimeException(e);
             }
             connector = getFirstConnector();
         }
         return connector;
     }
 
+    @SuppressWarnings("rawtypes")
     private Connector getFirstConnector()
     {
         Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -111,7 +147,6 @@ public class DriverImpl implements Drive
             selectedIter.remove();
             if(key.isReadable() || key.isWritable())
             {
-                selectedIter.remove();
                 return (Connector) key.attachment();
 
             }
@@ -122,7 +157,15 @@ public class DriverImpl implements Drive
 
     public void destroy()
     {
-        //TODO - Implement
+        try
+        {
+            _selector.close();
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when closing selector",e);
+            throw new RuntimeException(e);
+        }
     }
 
     public <C> Listener<C> createListener(String host, int port, C context)
@@ -148,25 +191,55 @@ public class DriverImpl implements Drive
 
     public <C> Listener<C> createListener(ServerSocketChannel c, C context)
     {
+        Listener<C> l = new ListenerImpl<C>(this, c, context);
+        SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
+        key.attach(l);
+        return l;
+    }
+
+    public <C> Connector<C> createConnector(String host, int port, C context)
+    {
         try
         {
-            c.register(_selector, SelectionKey.OP_ACCEPT);
-            return new ListenerImpl<C>(this, c, context);
+            SocketChannel channel = SocketChannel.open();
+            channel.configureBlocking(false);
+            channel.connect(new InetSocketAddress(host, port));
+            return createConnector(channel, context);
         }
-        catch (ClosedChannelException e)
+        catch (IOException e)
         {
-            e.printStackTrace();  // TODO - Implement
+            // TODO Auto-generated catch block
+            e.printStackTrace();
             throw new RuntimeException(e);
         }
     }
 
-    public <C> Connector<C> createConnector(String host, int port, C context)
+    public <C> Connector<C> createConnector(SelectableChannel c, C context)
     {
-        return null;  //TODO - Implement
+        SelectionKey key = registerInterest(c,SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+        Connector<C> co = new ConnectorImpl<C>(this, null, new 
SaslClientImpl(),(SocketChannel)c, context, key);
+        key.attach(co);
+        return co;
     }
 
-    public <C> Connector<C> createConnector(SelectableChannel fd, C context)
+    protected <C> Connector<C> createServerConnector(SelectableChannel c, C 
context, Listener<C> l)
     {
-        return null;  //TODO - Implement
+        SelectionKey key = registerInterest(c,SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+        Connector<C> co = new ConnectorImpl<C>(this, l, new 
SaslServerImpl(),(SocketChannel)c, context, key);
+        key.attach(co);
+        return co;
+    }
+
+    private <C> SelectionKey registerInterest(SelectableChannel c, int opKeys)
+    {
+        try
+        {
+            return c.register(_selector, opKeys);
+        }
+        catch (ClosedChannelException e)
+        {
+            e.printStackTrace();  // TODO - Implement
+            throw new RuntimeException(e);
+        }
     }
 }

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1396697&r1=1396696&r2=1396697&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
 Wed Oct 10 17:18:36 2012
@@ -1,8 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver.impl;
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Listener;
 
@@ -11,6 +34,7 @@ class ListenerImpl<C> implements Listene
     private final C _context;
     private final ServerSocketChannel _channel;
     private final DriverImpl _driver;
+    private final Logger _logger = Logger.getLogger("proton.driver");
 
     ListenerImpl(DriverImpl driver, ServerSocketChannel c, C context)
     {
@@ -19,7 +43,7 @@ class ListenerImpl<C> implements Listene
         _context = context;
     }
 
-    public Connector accept()
+    public Connector<C> accept()
     {
         try
         {
@@ -27,14 +51,14 @@ class ListenerImpl<C> implements Listene
             if(c != null)
             {
                 c.configureBlocking(false);
-                return new ServerConnectorImpl(_driver,c);
+                return _driver.createServerConnector(c, _context, this);
             }
         }
         catch (IOException e)
         {
-            e.printStackTrace();  // TODO - Implement
+            _logger.log(Level.SEVERE, "Exception when accepting connection",e);
         }
-        return null;  //TODO - Implement
+        return null;  //TODO - we should probably throw an exception instead 
of returning null?
     }
 
     public C getContext()
@@ -44,11 +68,18 @@ class ListenerImpl<C> implements Listene
 
     public void close()
     {
-        //TODO - Implement
+        try
+        {
+            _channel.close();
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when closing listener",e);
+        }
     }
 
     public void destroy()
     {
-        //TODO - Implement
+        close();
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to