Author: rajith
Date: Wed Oct 10 17:18:36 2012
New Revision: 1396697
URL: http://svn.apache.org/viewvc?rev=1396697&view=rev
Log:
PROTON-66 Adding the driver implementation. This is based on the work I
have done in my branch with minor modifications to adjust for API
changes since then.
Added:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Removed:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
Added:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1396697&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
(added)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Wed Oct 10 17:18:36 2012
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.driver.impl;
+
+import static
org.apache.qpid.proton.driver.impl.ConnectorImpl.ConnectorState.UNINITIALIZED;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.TransportFactory;
+
+class ConnectorImpl<C> implements Connector<C>
+{
+ public static int END_OF_STREAM = -1;
+ private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
+
+ enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
+
+ private final Sasl _sasl;
+ private final DriverImpl _driver;
+ private final Listener<C> _listener;
+ private final SocketChannel _channel;
+ private final Logger _logger = Logger.getLogger("proton.driver");
+ private C _context;
+
+ private Connection _connection;
+ private SelectionKey _key;
+ private ConnectorState _state = UNINITIALIZED;
+
+ private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
+ private int _bytesNotRead = 0;
+
+ private int _bytesNotWritten = 0;
+ private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
+ private Transport _transport = null;
+
+ ConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl,
SocketChannel c, C context, SelectionKey key)
+ {
+ _driver = driver;
+ _listener = listener;
+ _channel = c;
+ _sasl = sasl;
+ _context = context;
+ _key = key;
+ }
+
+ public void process()
+ {
+ if (_channel.isConnectionPending())
+ {
+ try
+ {
+ _channel.finishConnect();
+ }
+ catch (IOException io)
+ {
+ throw new RuntimeException("Exception will trying to complete
connection",io);
+ }
+ }
+
+ if (!_channel.isOpen())
+ {
+ _state = ConnectorState.CLOSED;
+ return;
+ }
+
+ if (_key.isReadable())
+ {
+ read();
+ }
+
+ if (_key.isWritable())
+ {
+ write();
+ }
+ }
+
+ void read()
+ {
+ try
+ {
+ int bytesRead = _channel.read(_readBuffer);
+ int consumed = 0;
+ while (bytesRead > 0)
+ {
+ consumed = processInput(_readBuffer.array(), 0, bytesRead +
_bytesNotRead);
+ if (consumed < bytesRead)
+ {
+ _readBuffer.compact();
+ _bytesNotRead = bytesRead - consumed;
+ }
+ else
+ {
+ _readBuffer.rewind();
+ _bytesNotRead = 0;
+ }
+ bytesRead = _channel.read(_readBuffer);
+ }
+ if (bytesRead == -1)
+ {
+ _state = ConnectorState.EOS;
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when reading from
connection",e);
+ }
+ }
+
+ void write()
+ {
+ try
+ {
+ processOutput();
+ if (_bytesNotWritten > 0)
+ {
+ _writeBuffer.limit(_bytesNotWritten);
+ int written = _channel.write(_writeBuffer);
+ if (_writeBuffer.hasRemaining())
+ {
+ _writeBuffer.compact();
+ _bytesNotWritten = _bytesNotWritten - written;
+ }
+ else
+ {
+ _writeBuffer.clear();
+ _bytesNotWritten = 0;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when writing to
connection",e);
+ }
+ }
+
+ int processInput(byte[] bytes, int offset, int size)
+ {
+ int read = 0;
+ while (read < size)
+ {
+ offset = read;
+ switch (_state)
+ {
+ case UNINITIALIZED:
+ read += readSasl(bytes, offset, size - offset);
+ if (isSaslDone())
+ {
+ _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
+ }
+ break;
+ case OPENED:
+ read += readAMQPCommands(bytes, offset, size - offset);
+ break;
+ case EOS:
+ case CLOSED:
+ break;
+ }
+ }
+ return read;
+ }
+
+ void processOutput()
+ {
+ switch (_state)
+ {
+ case UNINITIALIZED:
+ writeSasl();
+ if (isSaslDone())
+ {
+ _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
+ }
+ break;
+ case OPENED:
+ writeAMQPCommands();
+ break;
+ case EOS:
+ writeAMQPCommands();
+ case CLOSED: // not a valid option
+ //TODO
+ break;
+ }
+ }
+
+ int readAMQPCommands(byte[] bytes, int offset, int size)
+ {
+ int consumed = _transport.input(bytes, offset, size);
+ if (consumed == END_OF_STREAM)
+ {
+ return size;
+ }
+ else
+ {
+ return consumed;
+ }
+ }
+
+ void writeAMQPCommands()
+ {
+ int size = _writeBuffer.array().length - _bytesNotWritten;
+ _bytesNotWritten += _transport.output(_writeBuffer.array(),
+ _bytesNotWritten, size);
+ }
+
+ int readSasl(byte[] bytes, int offset, int size)
+ {
+ int consumed = _sasl.input(bytes, offset, size);
+ if (consumed == END_OF_STREAM)
+ {
+ return size;
+ }
+ else
+ {
+ return consumed;
+ }
+ }
+
+ void writeSasl()
+ {
+ int size = _writeBuffer.array().length - _bytesNotWritten;
+ _bytesNotWritten += _sasl.output(_writeBuffer.array(),
+ _bytesNotWritten, size);
+ }
+
+ public Listener<C> listener()
+ {
+ return _listener;
+ }
+
+ public Sasl sasl()
+ {
+ return _sasl;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void setConnection(Connection connection)
+ {
+ _connection = connection;
+ _transport =
TransportFactory.getDefaultTransportFactory().transport(_connection);
+ }
+
+ public C getContext()
+ {
+ return _context;
+ }
+
+ public void setContext(C context)
+ {
+ _context = context;
+ }
+
+ public void close()
+ {
+ if (_state == ConnectorState.CLOSED)
+ {
+ return;
+ }
+
+ try
+ {
+ // If the connection was closed due to authentication error
+ // then there might be data available to write on to the wire.
+ writeSasl();
+ writeAMQPCommands(); // write any closing commands
+ _channel.close();
+ _state = ConnectorState.CLOSED;
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when closing connection",e);
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
+ }
+
+ public void destroy()
+ {
+ close(); // close if not closed already
+ }
+
+ private void setState(ConnectorState newState)
+ {
+ _state = newState;
+ }
+
+ private boolean isSaslDone()
+ {
+ SaslState state = _sasl.getState();
+ return state == SaslState.PN_SASL_PASS || state ==
SaslState.PN_SASL_FAIL;
+ }
+}
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1396697&r1=1396696&r2=1396697&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
Wed Oct 10 17:18:36 2012
@@ -1,20 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.nio.channels.*;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.impl.SaslClientImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
public class DriverImpl implements Driver
{
private Selector _selector;
private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+ private Logger _logger = Logger.getLogger("proton.driver");
public DriverImpl() throws IOException
{
@@ -35,10 +66,12 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.log(Level.SEVERE, "Exception when waiting for IO Event",e);
+ throw new RuntimeException(e);
}
}
+ @SuppressWarnings("rawtypes")
public Listener listener()
{
Listener listener = null;
@@ -51,7 +84,8 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.log(Level.SEVERE, "Exception when selecting",e);
+ throw new RuntimeException(e);
}
listener = getFirstListener();
}
@@ -64,6 +98,7 @@ public class DriverImpl implements Drive
_selectedKeys = _selector.selectedKeys();
}
+ @SuppressWarnings("rawtypes")
private Listener getFirstListener()
{
Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -74,14 +109,13 @@ public class DriverImpl implements Drive
selectedIter.remove();
if(key.isAcceptable())
{
- selectedIter.remove();
return (Listener) key.attachment();
-
}
}
return null;
}
+ @SuppressWarnings("rawtypes")
public Connector connector()
{
Connector connector = null;
@@ -94,13 +128,15 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.log(Level.SEVERE, "Exception when selecting",e);
+ throw new RuntimeException(e);
}
connector = getFirstConnector();
}
return connector;
}
+ @SuppressWarnings("rawtypes")
private Connector getFirstConnector()
{
Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -111,7 +147,6 @@ public class DriverImpl implements Drive
selectedIter.remove();
if(key.isReadable() || key.isWritable())
{
- selectedIter.remove();
return (Connector) key.attachment();
}
@@ -122,7 +157,15 @@ public class DriverImpl implements Drive
public void destroy()
{
- //TODO - Implement
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when closing selector",e);
+ throw new RuntimeException(e);
+ }
}
public <C> Listener<C> createListener(String host, int port, C context)
@@ -148,25 +191,55 @@ public class DriverImpl implements Drive
public <C> Listener<C> createListener(ServerSocketChannel c, C context)
{
+ Listener<C> l = new ListenerImpl<C>(this, c, context);
+ SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
+ key.attach(l);
+ return l;
+ }
+
+ public <C> Connector<C> createConnector(String host, int port, C context)
+ {
try
{
- c.register(_selector, SelectionKey.OP_ACCEPT);
- return new ListenerImpl<C>(this, c, context);
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(new InetSocketAddress(host, port));
+ return createConnector(channel, context);
}
- catch (ClosedChannelException e)
+ catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ // TODO Auto-generated catch block
+ e.printStackTrace();
throw new RuntimeException(e);
}
}
- public <C> Connector<C> createConnector(String host, int port, C context)
+ public <C> Connector<C> createConnector(SelectableChannel c, C context)
{
- return null; //TODO - Implement
+ SelectionKey key = registerInterest(c,SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
+ Connector<C> co = new ConnectorImpl<C>(this, null, new
SaslClientImpl(),(SocketChannel)c, context, key);
+ key.attach(co);
+ return co;
}
- public <C> Connector<C> createConnector(SelectableChannel fd, C context)
+ protected <C> Connector<C> createServerConnector(SelectableChannel c, C
context, Listener<C> l)
{
- return null; //TODO - Implement
+ SelectionKey key = registerInterest(c,SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
+ Connector<C> co = new ConnectorImpl<C>(this, l, new
SaslServerImpl(),(SocketChannel)c, context, key);
+ key.attach(co);
+ return co;
+ }
+
+ private <C> SelectionKey registerInterest(SelectableChannel c, int opKeys)
+ {
+ try
+ {
+ return c.register(_selector, opKeys);
+ }
+ catch (ClosedChannelException e)
+ {
+ e.printStackTrace(); // TODO - Implement
+ throw new RuntimeException(e);
+ }
}
}
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1396697&r1=1396696&r2=1396697&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
Wed Oct 10 17:18:36 2012
@@ -1,8 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver.impl;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Listener;
@@ -11,6 +34,7 @@ class ListenerImpl<C> implements Listene
private final C _context;
private final ServerSocketChannel _channel;
private final DriverImpl _driver;
+ private final Logger _logger = Logger.getLogger("proton.driver");
ListenerImpl(DriverImpl driver, ServerSocketChannel c, C context)
{
@@ -19,7 +43,7 @@ class ListenerImpl<C> implements Listene
_context = context;
}
- public Connector accept()
+ public Connector<C> accept()
{
try
{
@@ -27,14 +51,14 @@ class ListenerImpl<C> implements Listene
if(c != null)
{
c.configureBlocking(false);
- return new ServerConnectorImpl(_driver,c);
+ return _driver.createServerConnector(c, _context, this);
}
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.log(Level.SEVERE, "Exception when accepting connection",e);
}
- return null; //TODO - Implement
+ return null; //TODO - we should probably throw an exception instead
of returning null?
}
public C getContext()
@@ -44,11 +68,18 @@ class ListenerImpl<C> implements Listene
public void close()
{
- //TODO - Implement
+ try
+ {
+ _channel.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when closing listener",e);
+ }
}
public void destroy()
{
- //TODO - Implement
+ close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]